简化数据流:Apache SeaTunnel实现多表同步的高效指南
Apache SeaTunnel除了单表之间的数据同步之外,也支持单表同步到多表,多表同步到单表,以及多表同步到多表,下面简单举例说明如何实现这些功能。
单表 to 单表
一个source,一个sink。
从mysql同步到mysql,中间不做区分
env {# You can set flink configuration hereexecution.parallelism = 2job.mode = "BATCH"
}
source{Jdbc {url = "jdbc:mysql://127.0.0.1:3306/test"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"query = "select * from base_region"}
}transform {# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,# please go to https://seatunnel.apache.org/docs/transform/sql
}sink {jdbc {url = "jdbc:mysql://127.0.0.1:3306/dw"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"query = "insert into base_region(id,region_name) values(?,?)"}
}
执行任务
./bin/seatunnel.sh --config ./config/mysql2mysql_batch.conf
单表 to 多表
一个source,多个sink。
从MySQL同步到MySQL,将一个用户表数据同步过去,中间通过2个sql组件分布将男性用户和女性用户分开,在sink阶段分别插入到不同的表:
env {execution.parallelism = 2job.mode = "BATCH"
}
source {Jdbc {url = "jdbc:mysql://127.0.0.1:3306/test"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"result_table_name="t_user"query = "select * from t_user;"}
}transform {Sql {source_table_name = "t_user"result_table_name = "t_user_nan"query = "select id,name,birth,gender from t_user where gender ='男';"}Sql {source_table_name = "t_user"result_table_name = "t_user_nv"query = "select id,name,birth,gender from t_user where gender ='女';"}
}sink {jdbc {url = "jdbc:mysql://127.0.0.1:3306/dw"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"source_table_name = "t_user_nan"query = "insert into t_user_nan(id,name,birth,gender) values(?,?,?,?)"}jdbc {url = "jdbc:mysql://127.0.0.1:3306/dw"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"source_table_name = "t_user_nv"query = "insert into t_user_nv(id,name,birth,gender) values(?,?,?,?)"}
}
./bin/seatunnel.sh --config ./config/mysql2mysql_1n.conf
多表 to 单表
多个source,一个sink。
假如有一张交换器使用情况表,一张路由器使用情况表,目标表是将这种数据合在一起的olap表。
表结构如下:
-- dw 源表1
CREATE TABLE IF NOT EXISTS ads_device_switch_performance (`event_time` timestamp COMMENT '业务时间',`device_id` VARCHAR(32) COMMENT '设备id',`device_type` VARCHAR(32) COMMENT '设备类型',`device_name` VARCHAR(128) COMMENT '设备名称',`cpu_usage` INT COMMENT 'CPU使用率百分比'
) ;INSERT INTO `ads_device_switch_performance` VALUES ('2024-01-15 14:25:11', '2001', '2', '交换器1', 49);
INSERT INTO `ads_device_switch_performance` VALUES ('2024-01-17 22:25:40', '2002', '1', '交换器2', 65);-- dw 源表2
CREATE TABLE IF NOT EXISTS ads_device_router_performance (`event_time` timestamp COMMENT '业务时间',`device_id` VARCHAR(32) COMMENT '设备id',`device_type` VARCHAR(32) COMMENT '设备类型',`device_name` VARCHAR(128) COMMENT '设备名称',`cpu_usage` INT COMMENT 'CPU使用率百分比'
);INSERT INTO `ads_device_router_performance` VALUES ('2024-01-17 21:23:22', '1001', '1', '路由器1', 35);
INSERT INTO `ads_device_router_performance` VALUES ('2024-01-16 17:23:53', '1002', '2', '路由器2', 46);-------------------------------------------------------------------------------
-- olap 目标表
CREATE TABLE `device_performance` (`id` INT NOT NULL AUTO_INCREMENT COMMENT '表主键',`event_time` VARCHAR(32) NOT NULL COMMENT '业务时间',`device_id` VARCHAR(32) COMMENT '设备id',`device_type` VARCHAR(32) COMMENT '设备类型',`device_name` VARCHAR(128) NOT NULL COMMENT '设备名称',`cpu_usage` FLOAT NOT NULL COMMENT 'CPU利用率单位是%',`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`)
) COMMENT='设备状态';
将交换器数据和路由器数据一起同步到olap目标表,总结通过sql组件处理:
env {job.mode="BATCH"job.name="device_performance"
}source {Jdbc {url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"result_table_name="switch_src"query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_switch_performance;"}Jdbc {url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"result_table_name="router_src"query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_router_performance;"}
}transform {Sql {source_table_name = "switch_src"result_table_name = "switch_dst"query = "SELECT event_time , device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM switch_src;"}Sql {source_table_name = "router_src"result_table_name = "router_dst"query = "SELECT event_time, device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM router_src;"}
}sink {Jdbc {url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"source_table_name = "switch_dst"query="INSERT INTO device_performance VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"}Jdbc {url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"source_table_name = "router_dst"query="INSERT INTO device_performance VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"}
}
执行任务:
./bin/seatunnel.sh --config ./syn_job/mysql2mysql_n1_batch.conf
作业成功!
多表 to 多表
多个source,多个sink。
将交换器使用情况数据和路由器使用情况数据分别同步到对应的目标表,中间sql组件处理
env {job.mode="BATCH"job.name="device_performance"
}source {Jdbc {url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"result_table_name="switch_src"query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_switch_performance;"}Jdbc {url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"result_table_name="router_src"query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_router_performance;"}
}transform {Sql {source_table_name = "switch_src"result_table_name = "switch_dst"query = "SELECT event_time , device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM switch_src;"}Sql {source_table_name = "router_src"result_table_name = "router_dst"query = "SELECT event_time, device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM router_src;"}
}sink {Jdbc {url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"source_table_name = "switch_dst"query="INSERT INTO device_performance_switch VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"}Jdbc {url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"source_table_name = "router_dst"query="INSERT INTO device_performance_router VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"}
}
结语
综上所述,Apache SeaTunnel多表同步技术具有高效、实时、可靠和灵活的特点,在企业的数据同步领域发挥着重要作用。借助Apache SeaTunnel多表同步功能,企业能够更好地实现不同系统和数据库之间数据的无缝流转,提升数据管理和利用的效率,为业务发展提供有力支持。希望本文能够帮助读者更好地了解和应用Apache SeaTunnel多表同步,从而为企业数据同步带来更多可能性。
原文链接:https://blog.csdn.net/weixin_44586883/article/details/136049897
本文由 白鲸开源科技 提供发布支持!
相关文章:
简化数据流:Apache SeaTunnel实现多表同步的高效指南
Apache SeaTunnel除了单表之间的数据同步之外,也支持单表同步到多表,多表同步到单表,以及多表同步到多表,下面简单举例说明如何实现这些功能。 单表 to 单表 一个source,一个sink。 从mysql同步到mysql,…...
均匀圆形阵列原理及MATLAB仿真
均匀圆形阵列原理及MATLAB仿真 目录 前言 一、均匀圆阵原理 二、圆心不存在阵元方向图仿真 三、圆心存在阵元方向图仿真 四、MATLAB仿真代码 总结 前言 本文详细推导了均匀圆形阵列的方向图函数,对圆心不放置阵元和圆心放置阵元的均匀圆形阵列方向图都进行了仿…...
vue2使用univerjs
1、univerjs Univer 提供了一个全面的企业级文档与数据协同的解决方案,支持电子表格、文本文档和演示幻灯片三大核心文档类型。通过灵活的 API 和插件机制,开发者可以在 Univer 的基础上进行个性化功能的定制和扩展,以适应不同用户在不同场景…...
VUE3 el-table-column header新增必填*
1.在需要加必填星号的el-table-column上添加render-header属性 <el-table-column :label"getName(产品代码)" :render-header"addRedStart" prop"MODELCODE" min-width“4.5%”> <template v-slot"scope"> <el-input …...
条件概率和贝叶斯公式
...
Kali中docker与docker-compose的配置
权限升级 sudo su 升级为root用户 更新软件 apt-get update安装HTTPS协议和CA证书 apt-get install -y apt-transport-https ca-certificates下载docker apt下载docker apt install docker.io 验证docker安装是否成功 查版本 docker -v 启动docker systemctl start …...
C++ | Leetcode C++题解之第283题移动零
题目: 题解: class Solution { public:void moveZeroes(vector<int>& nums) {int n nums.size(), left 0, right 0;while (right < n) {if (nums[right]) {swap(nums[left], nums[right]);left;}right;}} };...
Exponential Moving Average (EMA) in Stable Diffusion
1.Moving Average in Stable Diffusion (SMA&EMA) 1.Moving average 2.移动平均值 3.How We Trained Stable Diffusion for Less than $50k (Part 3) Moving Average 在统计学中,移动平均是通过创建整个数据集中不同选择的一系列平均值来分析数据点的计算。 …...
017、Vue动态tag标签
文章目录 1、先看效果2、代码 1、先看效果 2、代码 <template><div class "tags"><el-tag size"medium"closable v-for"item,index in tags":key"item.path":effect"item.title$route.name?dark:plain"cl…...
RocketMQ 架构概览
Apache RocketMQ 是一个分布式消息中间件和流计算平台,提供低延迟、高性能和可靠的队列服务,并且支持大规模的分布式系统。在详细介绍 RocketMQ 的整体架构之前,先了解其设计目标和核心特性是很重要的。RocketMQ 主要用于处理大规模的消息&am…...
优化医疗数据管理:Kettle ETL 数据采集方案详解
在现代医疗保健领域,数据的准确性、完整性和及时性对于提高医疗服务质量和患者护理至关重要。为了有效管理和利用医疗数据,Kettle ETL(Extract, Transform, Load)数据采集方案成为了许多医疗机构的首选工具之一。本文将深入探讨Ke…...
spring-from表单
在spring boot当中,from表单怎样开发(name=value) 先列出接口所需信息(抓包得到请求信息),将这些必要信息以注解的方式表达出来 步骤: 梳理前置条件(请求地址,请求header,请求方法,请求数据,响应结果)编辑一个普通类,在类上标记注解@Controller: 标记在类上,让类…...
【.NET】asp.net core 程序重启容器后redis无法连接,连接超时
环境是容器化部署asp.net core 程序当有大量请求打到容器如果此时重启容器会出现,redis无法连接情况。 使用 csredis 库报错: Status unavailable, waiting for recovery. Connect to server timeout 使用StackExchange.Redis 报错: Time…...
【vue前端项目实战案例】Vue3仿今日头条App
本文将开发一款仿“今日头条”的新闻App。该案例是基于 Vue3.0 Vue Router webpack TypeScript 等技术栈实现的一款新闻资讯类App,适合有一定Vue框架使用经验的开发者进行学习。 项目源码在文章末尾 1 项目概述 该项目是一款“今日头条”的新闻资讯App…...
常见的文心一言的指令
文心一言,作为百度研发的预训练语言模型“ERNIE 3.0”的一项功能,能够与人对话互动,回答问题,协助创作,高效便捷地帮助人们获取信息、知识和灵感。以下是一些常见的文心一言指令类型及其具体示例: 1. 查询…...
数字货币交易接口实现(含源代码)
数字货币交易接口实现(含源代码) 使用币安交易接口步骤1:注册API密钥步骤2:安装所需库步骤3:使用API进行交易获取市场数据查看账户信息执行交易错误处理安全提示 使用OKX交易接口步骤1:注册API密钥步骤2&am…...
c++函数以及函数分文件编写
1.函数 1.1格式 返回值类型 函数名 (参数列表)//返回值类型指的是return过去的类型 { 函数体语句 return 表达式 } 1.2常见的函数样式 1.无参返回 2.有参返回 3.无参有返 4.有参有返 #include<iostream> using namespace std; int add(int nu…...
【JVM基础06】——组成-直接内存详解
目录 1- 引言:直接内存概述1-1 直接内存是什么?直接内存的定义(What)1-2 为什么用直接内存?Java程序对直接内存的使用 (Why) 2- ⭐核心:详解直接内存(How)2-1 文件拷贝案例介绍对比常规 IO(BIO) 和 NIO常规 IO 的操作流程NIO 的操…...
学术研讨 | 区块链与隐私计算领域专用硬件研讨会顺利召开
学术研讨 近日,国家区块链技术创新中心主办,长安链开源社区支持的“区块链与隐私计算领域专用硬件研讨会”顺利召开,会议围绕基于区块链与隐私计算的生成式AI上链、硬件加速、软硬协同等主题展开讨论,来自复旦大学、清华大学、北京…...
AngularJS API 深入解析
AngularJS API 深入解析 AngularJS,作为一个强大且灵活的JavaScript框架,自从其诞生以来,就一直是前端开发者构建复杂Web应用的首选工具。本文将深入探讨AngularJS的API,帮助读者理解其核心功能和工作原理。 AngularJS简介 AngularJS由Google开发,并于2010年发布。它是…...
springboot 百货中心供应链管理系统小程序
一、前言 随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也是备受用户的喜爱,百货中心供应链管理系统被用户普遍使用,为方…...
基于ASP.NET+ SQL Server实现(Web)医院信息管理系统
医院信息管理系统 1. 课程设计内容 在 visual studio 2017 平台上,开发一个“医院信息管理系统”Web 程序。 2. 课程设计目的 综合运用 c#.net 知识,在 vs 2017 平台上,进行 ASP.NET 应用程序和简易网站的开发;初步熟悉开发一…...
【项目实战】通过多模态+LangGraph实现PPT生成助手
PPT自动生成系统 基于LangGraph的PPT自动生成系统,可以将Markdown文档自动转换为PPT演示文稿。 功能特点 Markdown解析:自动解析Markdown文档结构PPT模板分析:分析PPT模板的布局和风格智能布局决策:匹配内容与合适的PPT布局自动…...
(二)原型模式
原型的功能是将一个已经存在的对象作为源目标,其余对象都是通过这个源目标创建。发挥复制的作用就是原型模式的核心思想。 一、源型模式的定义 原型模式是指第二次创建对象可以通过复制已经存在的原型对象来实现,忽略对象创建过程中的其它细节。 📌 核心特点: 避免重复初…...
DBAPI如何优雅的获取单条数据
API如何优雅的获取单条数据 案例一 对于查询类API,查询的是单条数据,比如根据主键ID查询用户信息,sql如下: select id, name, age from user where id #{id}API默认返回的数据格式是多条的,如下: {&qu…...
IoT/HCIP实验-3/LiteOS操作系统内核实验(任务、内存、信号量、CMSIS..)
文章目录 概述HelloWorld 工程C/C配置编译器主配置Makefile脚本烧录器主配置运行结果程序调用栈 任务管理实验实验结果osal 系统适配层osal_task_create 其他实验实验源码内存管理实验互斥锁实验信号量实验 CMISIS接口实验还是得JlINKCMSIS 简介LiteOS->CMSIS任务间消息交互…...
MySQL用户和授权
开放MySQL白名单 可以通过iptables-save命令确认对应客户端ip是否可以访问MySQL服务: test: # iptables-save | grep 3306 -A mp_srv_whitelist -s 172.16.14.102/32 -p tcp -m tcp --dport 3306 -j ACCEPT -A mp_srv_whitelist -s 172.16.4.16/32 -p tcp -m tcp -…...
Razor编程中@Html的方法使用大全
文章目录 1. 基础HTML辅助方法1.1 Html.ActionLink()1.2 Html.RouteLink()1.3 Html.Display() / Html.DisplayFor()1.4 Html.Editor() / Html.EditorFor()1.5 Html.Label() / Html.LabelFor()1.6 Html.TextBox() / Html.TextBoxFor() 2. 表单相关辅助方法2.1 Html.BeginForm() …...
从“安全密码”到测试体系:Gitee Test 赋能关键领域软件质量保障
关键领域软件测试的"安全密码":Gitee Test如何破解行业痛点 在数字化浪潮席卷全球的今天,软件系统已成为国家关键领域的"神经中枢"。从国防军工到能源电力,从金融交易到交通管控,这些关乎国计民生的关键领域…...
协议转换利器,profinet转ethercat网关的两大派系,各有千秋
随着工业以太网的发展,其高效、便捷、协议开放、易于冗余等诸多优点,被越来越多的工业现场所采用。西门子SIMATIC S7-1200/1500系列PLC集成有Profinet接口,具有实时性、开放性,使用TCP/IP和IT标准,符合基于工业以太网的…...
