Datax3.0+DataX-Web部署分布式可视化ETL系统
一、DataX 简介
DataX 是阿里云 DataWorks 数据集成的开源版本,主要就是用于实现数据间的离线同步。DataX 致力于实现包括关系型数据库(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源(即不同的数据库) 间稳定高效的数据同步功能。

为了解决异构数据源同步问题,DataX将复杂的网状同步链路变成了星型数据链路,DataX 作为中间传输载体负责连接各种数据源;当需要接入一个新的数据源时,只需要将此数据源对接到 DataX,便能跟已有的数据源作为无缝数据同步。
1.DataX3.0框架设计
DataX 采用 Framework + Plugin 架构,将数据源读取和写入抽象称为 Reader/Writer 插件,纳入到整个同步框架中。

2.DataX3.0核心架构
DataX 完成单个数据同步的作业,我们称为 Job,DataX 接收到一个 Job 后,将启动一个进程来完成整个作业同步过程。DataX Job 模块是单个作业的中枢管理节点,承担了数据清理、子任务切分、TaskGroup 管理等功能。

DataX Job 启动后,会根据不同源端的切分策略,将 Job 切分成多个小的 Task (子任务),以便于并发执行。
接着 DataX Job 会调用 Scheduler 模块,根据配置的并发数量,将拆分成的 Task 重新组合,组装成 TaskGroup(任务组)。
每一个 Task 都由 TaskGroup 负责启动,Task 启动后,会固定启动 Reader --> Channel --> Writer 线程来完成任务同步工作。
DataX 作业运行启动后,Job 会对 TaskGroup 进行监控操作,等待所有 TaskGroup 完成后,Job 便会成功退出(异常退出时值非0)。
3.DataX调度过程
首先 DataX Job 模块会根据分库分表切分成若干个 Task,然后根据用户配置并发数,来计算需要分配多少个 TaskGroup;计算过程:Task / Channel = TaskGroup,最后由 TaskGroup 根据分配好的并发数来运行 Task(任务)。
二、在Linux部署 DataX3.0
DataX下载地址
DataX源码下载地址
因为 CentOS 7 上自带 Python 2.7 的软件包,所以不需要进行安装

1.Linux 上安装 DataX 软件
[root@192 ~]# wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
[root@192 ~]# tar -zxvf datax.tar.gz -C /usr/local/
[root@192 ~]# rm -rf /usr/local/datax/plugin/*/._* # 需要删除隐藏文件 (重要)
当未删除时,可能会输出:[/usr/local/datax/plugin/reader/._drdsreader/plugin.json] 不存在. 请检查您的配置文件.
验证:
[root@192 ~]# cd /usr/local/datax/bin
[root@192 ~]# python datax.py ../job/job.json # 用来验证是否安装成功
输出:
2022-04-24 17:21:04.845 [job-0] INFO JobContainer -
任务启动时刻 : 2022-04-24 17:20:54
任务结束时刻 : 2022-04-24 17:21:04
任务总计耗时 : 10s
任务平均流量 : 253.91KB/s
记录写入速度 : 10000rec/s
读出记录总数 : 100000
读写失败总数 : 0
三、使用 DataX3.0 实现数据同步
例如生成MySQL到MySQL同步的模板:
#输出mysql配置模版
[root@192 bin]# python /usr/local/datax/bin/datax.py -r mysqlreader -w mysqlwriter > /usr/local/datax/job/mysql2mysql.json#根据模板编写 mysql2mysql.json文件
{"job": {"content": [{"reader": {"name": "mysqlreader", "parameter": {"column": ["id","name"], #"*"表示所有字段"connection": [{"jdbcUrl": ["jdbc:mysql://x.x.x.210:3306/mytest"], "table": ["user"]}], "password": "root", "username": "root" }}, "writer": {"name": "mysqlwriter", "parameter": {"column": ["id","name"], "connection": [{"jdbcUrl": "jdbc:mysql://192.168.88.192:3306/mytest", "table": ["user"]}], "password": "root", "username": "root", "writeMode": "insert"}}}], "setting": {"speed": {"channel": "6"}}}
}
验证:
[root@192 job]# python /usr/local/datax/bin/datax.py mysql2mysql.json2022-04-24 17:39:03.445 [job-0] INFO JobContainer -
任务启动时刻 : 2022-04-24 17:38:49
任务结束时刻 : 2022-04-24 17:39:03
任务总计耗时 : 14s
任务平均流量 : 0B/s
记录写入速度 : 0rec/s
读出记录总数 : 3
读写失败总数 : 0
四、DataX-WEB 安装部署
1.下载DataX-Web源码
https://github.com/WeiYe-Jing/datax-web
1.IDEA编译打包
下载链接:https://pan.baidu.com/s/13a8nIpz6FL8y4fdE94trjQ 提取码:data
2.官方提供的版本tar版本包
https://pan.baidu.com/s/13yoqhGpD00I82K4lOYtQhg 提取码:cpsk
2.解压安装包
在选定的安装目录,解压安装包
[root@192 ~]# tar -zxvf datax-web-2.1.2.tar.gz -C /usr/local/dataxweb
3.登录msyql建库
为接下来一键安装部署准备,这里我建的库是dataxweb(自己定义就好,前后保持一致)
mysql> create database dataxweb;
初始化数据库
# 找到 bin/db/datax_web.sql 文件,进行初始化
4.执行一键安装脚本
进入解压后的目录,找到bin目录下面的install.sh文件,如果选择交互式的安装,则直接执行
[root@192 dataxweb]# cd bin/
[root@192 bin]# pwd
/usr/local/dataxweb/bin
[root@192 bin]# ./install.sh
然后按照提示操作即可。包含了数据库初始化,如果你的服务上安装有mysql命令,在执行安装脚本的过程中则会出现以下提醒:
Scan out mysql command, so begin to initalize the database
Do you want to initalize database with sql: [{INSTALL_PATH}/bin/db/datax-web.sql]? (Y/N)y
Please input the db host(default: 127.0.0.1):
Please input the db port(default: 3306):
Please input the db username(default: root):
Please input the db password(default: ): root
Please input the db name(default: dataxweb)
按照提示输入数据库地址,端口号,用户名,密码以及数据库名称,大部分情况下即可快速完成初始化。
如果服务上并没有安装mysql命令,则可以取用目录下/bin/db/datax-web.sql脚本去手动执行,完成后修改相关配置文件
vi modules/datax-admin/conf/bootstrap.properties#Database
DB_HOST=127.0.0.1
DB_PORT=3306
DB_USERNAME=root
DB_PASSWORD=root
DB_DATABASE=dataxweb
按照具体情况配置对应的值即可。
在交互模式下,对各个模块的package压缩包的解压以及configure配置脚本的调用,都会请求用户确认,可根据提示查看是否安装成功,如果没有安装成功,可以重复尝试;如果不想使用交互模式,跳过确认过程,则执行以下命令安装
./bin/install.sh --force
5.其他配置
(1)、邮件服务
在项目目录:modules/datax-admin/bin/env.properties 配置邮件服务(可跳过)MAIL_USERNAME="" MAIL_PASSWORD=""
此文件中包括一些默认配置参数,例如:server.port,具体请查看文件。
(2)、指定PYTHON_PATH的路径
vim modules/datax-executor/bin/env.properties### 执行datax的python脚本地址
PYTHON_PATH=/usr/local/datax/bin/datax.py### 保持和datax-admin服务的端口一致;默认是9527,如果没改datax-admin的端口,可以忽略
DATAX_ADMIN_PORT=
此文件中包括一些默认配置参数,例如:executor.port,json.path,data.path等,具体请查看文件。
6.启动服务
(1)、一键启动所有服务
[root@192 dataxweb]# cd /usr/local/dataxweb/
[root@192 dataxweb]# ./bin/start-all.sh
中途可能发生部分模块启动失败或者卡住,可以退出重复执行,如果需要改变某一模块服务端口号,则:
vi ./modules/{module_name}/bin/env.properties
找到SERVER_PORT配置项,改变它的值即可。当然也可以单一地启动某一模块服务:
./bin/start.sh -m {module_name}module_name可以为datax-admin或datax-executor
(2)、一键停止所有服务
[root@192 dataxweb]# cd /usr/local/dataxweb/
[root@192 dataxweb]# ./bin/stop-all.sh
当然也可以单一地停止某一模块服务:
./bin/stop.sh -m {module_name}
(3)、查看服务
在Linux环境下使用JPS命令,查看是否出现DataXAdminApplication和DataXExecutorApplication进程,如果存在这表示项目运行成功
如果项目启动失败,请检查启动日志:
modules/datax-admin/bin/console.out
或者
modules/datax-executor/bin/console.out
五、DataX-WEB 运行
1.前端界面
部署完成后,在浏览器中输入 http://ip:port/index.html 就可以访问对应的主界面(ip为datax-admin部署所在服务器ip,port为为datax-admin 指定的运行端口9527)

输入用户名 admin 密码 123456 就可以直接访问系统
2.datax-web API
datax-web部署成功后,可以了解datax-web API相关内容,网址: http://ip:port/doc.html

3.DataX-WEB 运行日志
部署完成之后,在modules/对应的项目/data/applogs下(用户也可以自己指定日志,修改application.yml 中的logpath地址即可),用户可以根据此日志跟踪项目实际启动情况
如果执行器启动比admin快,执行器会连接失败,日志报"拒绝连接"的错误,一般是先启动admin,再启动executor,30秒之后会重连,如果成功请忽略这个异常。
六、DataX-WEB 实操
1.查看执行器
查看web界面是否有注册成功的执行器,另外执行器可以根据需要改名称。

2.创建项目

3.路由策略
当执行器集群部署时,提供丰富的路由策略,包括:
FIRST(第一个):固定选择第一个机器;
LAST(最后一个):固定选择最后一个机器;
ROUND(轮询):依次分配任务;
RANDOM(随机):随机选择在线的机器;
CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
LEAST_RECENTLY_USED(最近最久未使用):最久为使用的机器优先被选举;
FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
阻塞处理策略:调度过于密集执行器来不及处理时的处理策略
•单机串行:调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
•丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
•覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;
增量增新建议将阻塞策略设置为丢弃后续调度或者单机串行
设置单机串行时应该注意合理设置重试次数(失败重试的次数* 每次执行时间< 任务的调度周期),重试的次数如果设置的过多会导致数据重复,例如任务30秒执行一次,每次执行时间需要20秒,设置重试三次,如果任务失败了,第一个重试的时间段为1577755680-1577756680,重试任务没结束,新任务又开启,那新任务的时间段会是1577755680-1577758680
4.任务类型
先选择DataX任务,后续配置完详细任务后可以按照下图修改,其它可以根据需求填写。

5. 数据源配置
根据不同数据源,配置参数。


6.任务构建
构建reader

这里没按上面操作生成映射,直接使用任务管理->添加手动配置

{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "root","column": ["*"],"where": " save_time >= FROM_UNIXTIME(${lastTime}) and save_time < FROM_UNIXTIME(${currentTime})","splitPk": "id","connection": [{"table": ["uc_op_amazon_api_store_download"],"jdbcUrl": ["jdbc:mysql://x.x.x.210:3306/test_system"]}]}},"writer": {"name": "mysqlwriter","parameter": {"writeMode": "insert","username": "root","password": "root","column": ["*"],"connection": [{"jdbcUrl": "jdbc:mysql://192.168.88.192:3306/mytest?useUnicode=true&characterEncoding=utf8","table": ["uc_op_amazon_api_store_download"]}]}}}],"setting": {"speed": {"channel": 6}}}
}
-DstartId='%s' -DendId='%s'# 表名
uc_op_business_reports#主键
id#id自增配置条件

相关文章:
Datax3.0+DataX-Web部署分布式可视化ETL系统
一、DataX 简介 DataX 是阿里云 DataWorks 数据集成的开源版本,主要就是用于实现数据间的离线同步。DataX 致力于实现包括关系型数据库(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源(即不同的数据库&#x…...
【Java 数据结构】排序
排序算法 1. 排序的概念及引用1.1 排序的概念1.2 常见的排序算法 2. 常见排序算法的实现2.1 插入排序2.1.1 直接插入排序2.1.2 希尔排序( 缩小增量排序 ) 2.2 选择排序2.2.1 直接选择排序2.2.2 堆排序 2.3 交换排序2.3.1冒泡排序2.3.2 快速排序2.3.3 快速排序非递归 2.4 归并排…...
Deepin如何开启与配置SSH实现无公网ip远程连接
文章目录 前言1. 开启SSH服务2. Deppin安装Cpolar3. 配置ssh公网地址4. 公网远程SSH连接5. 固定连接SSH公网地址6. SSH固定地址连接测试 前言 Deepin操作系统是一个基于Debian的Linux操作系统,专注于使用者对日常办公、学习、生活和娱乐的操作体验的极致࿰…...
【Springcloud篇】学习笔记十(十七章):Sentinel实现熔断与限流——Hystrix升级
第十七章_Sentinel实现熔断与限流 1.Sentinel介绍 1.1是什么 随着微服务的流行,服务和服务之间的稳定性变得越来越重要。 Sentinel 以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。 用来代替Hystrix Sentinel 具有…...
【算法与数据结构】718、1143、LeetCode最长重复子数组 最长公共子序列
文章目录 一、718、最长重复子数组二、1143、最长公共子序列三、完整代码 所有的LeetCode题解索引,可以看这篇文章——【算法和数据结构】LeetCode题解。 一、718、最长重复子数组 思路分析: 第一步,动态数组的含义。 d p [ i ] [ j ] dp[i]…...
C# SSH.NET 长命令及时返回
在SSH中执行长时间的命令,SSH.NET及时在文本框中返回连续显示结果。 c# - Execute long time command in SSH.NET and display the results continuously in TextBox - Stack Overflow 博主管理了一个服务器集群,准备上自动巡检工具,测试在…...
Rust学习之Features
Rust学习之Features 一 什么是 Features二 默认 feature三 简单的features应用示例四 可选(optional)的依赖五 依赖的特性5.1 在依赖表中指定5.2 在features表中指定 六 命令行中特性控制七 特性统一路径八 其它8.1 相互排斥特性8.2 观察启用特性8.3 Feature resolver version …...
云计算基础(云计算概述)
目录 一、云计算概述 1.1 云计算的概念 1.1.1 云计算解决的问题 1.1.2 云计算的概念 1.1.3 云计算的组成 1.2 云计算主要特征 1.2.1 按需自助服务 1.2.2 泛在接入 1.2.3 资源池化 1.2.4 快速伸缩性 1.2.5 服务可度量 1.3 云计算服务模式 1.3.1 软件即服务(Softwar…...
【机器学习】科学库使用手册第2篇:机器学习任务和工作流程(已分享,附代码)
本系列文章md笔记(已分享)主要讨论人工智能相关知识。主要内容包括,了解机器学习定义以及应用场景,掌握机器学习基础环境的安装和使用,掌握利用常用的科学计算库对数据进行展示、分析,学会使用jupyter note…...
【React】前端项目引入阿里图标
【React】前端项目引入阿里图标 方式11、登录自己的iconfont-阿里巴巴矢量图标库,把需要的图标加入到自己的项目中去;2、加入并进入到项目中去选择Font class 并下载到本地3、得到的文件夹如下4. 把红框中的部分粘贴到自己的项目中(public 文…...
Javascript入门:第三个知识点:javascript里的数据类型、运算符
数字类型 123 //整数 123.1 //浮点数 1.123e3 //科学计数法 -10 //负数 NaN //not a number Infinity //无限大 以上的类型在javascript里都是数字类型 字符串类型 在开始之前,我需要先说明白两个知识点: console.log()是啥? let 与 v…...
最新版国产会声会影2024新功能爆料
会声会影2024是一个视频编辑软件,具备以下功能: 会声会影2024安装包下载如下: https://wm.makeding.com/iclk/?zoneid55677 1. 视频剪辑:可以对视频进行剪辑、裁剪、拼接和分割操作,实现对视频片段的精确控制。 2. 音频编辑&…...
Pandas处理Excel文件的实用指南 - Python开发技巧XI
处理Excel文件是数据分析师日常工作中的常见任务之一。 幸运的是,Python的Pandas库提供了一套强大的工具,使得读取、处理和写入Excel文件变得既清晰又快捷。 在本篇博客中,我们将探讨如何使用Pandas的 read_excel 方法来读取Excel文件&#x…...
泰克示波器(TBS2000系列)触发功能使用讲解——边沿触发
# Trigger区域 触发区域用于对触发功能进行配置。示波器的触发功能用于采集(Acquire)那些在瞬间出现的信号,便于我们分析观察,此时可以当做逻辑分析仪使用。触发区域按钮包括:menu、Level\Force Trig三个。 目录 1.1 …...
C++学习Day01之C++对C语言增强和扩展
目录 一、程序及输出1.1 全局变量检测增强1.2 函数检测增强1.3 类型转换检测增强1.4 struct增强1.5 bool类型扩展1.6 三目运算符增强1.7 const增强1.7.1 全局Const对比1.7.2 局部Const对比1.7.3 Const变量初始化数组1.7.3 Const修饰变量的链接性 二、分析总结 一、程序及输出 …...
【文件上传WAF绕过】<?绕过、.htaccess木马、.php绕过
🍬 博主介绍👨🎓 博主介绍:大家好,我是 hacker-routing ,很高兴认识大家~ ✨主攻领域:【渗透领域】【应急响应】 【python】 【VulnHub靶场复现】【面试分析】 🎉点赞➕评论➕收藏…...
flutter如何实现省市区选择器
前言 当我们需要用户填写地址时,稳妥的做法是让用户通过“滚轮”来滑动选择省份,市,区,此文采用flutter的第三方库来实现这一功能,比调用高德地图api简单一些。 流程 选择库 这里我选择了一个最近更新且支持中国的…...
Python——将Pyaudio的frame音频数据转换成wave格式
要将pyaudio捕获的音频帧(frame)数据转换成wave模块可以直接处理的格式,通常意味着你需要将这些音频帧数据组装成一个完整的音频流,并确保它们以wave模块期望的格式进行存储。但是,如果你的目的是将这些帧数据直接转换…...
Vue 上门取件时间组件
本文使用vue2.0elementui 制作一个上门取件时间组件,类似顺丰,样式如下: 大概功能:点击期望上门时间,下面出现一个弹框可以选择时间: 首先我们定义一些需要的数据: data() {return {isDropdown…...
学习python第一天
1.输出 print("Hello, World!") 2.退出命令提升符 exit() 3.Python 缩进 实例 if 5 > 2:print("Five is greater than two!") 空格数取决于程序员,但至少需要一个。 您必须在同一代码块中使用相同数量的空格,否则 Python 会…...
学校招生小程序源码介绍
基于ThinkPHPFastAdminUniApp开发的学校招生小程序源码,专为学校招生场景量身打造,功能实用且操作便捷。 从技术架构来看,ThinkPHP提供稳定可靠的后台服务,FastAdmin加速开发流程,UniApp则保障小程序在多端有良好的兼…...
macOS多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用
文章目录 问题现象问题原因解决办法 问题现象 macOS启动台(Launchpad)多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用。 问题原因 很明显,都是Google家的办公全家桶。这些应用并不是通过独立安装的…...
【项目实战】通过多模态+LangGraph实现PPT生成助手
PPT自动生成系统 基于LangGraph的PPT自动生成系统,可以将Markdown文档自动转换为PPT演示文稿。 功能特点 Markdown解析:自动解析Markdown文档结构PPT模板分析:分析PPT模板的布局和风格智能布局决策:匹配内容与合适的PPT布局自动…...
《通信之道——从微积分到 5G》读书总结
第1章 绪 论 1.1 这是一本什么样的书 通信技术,说到底就是数学。 那些最基础、最本质的部分。 1.2 什么是通信 通信 发送方 接收方 承载信息的信号 解调出其中承载的信息 信息在发送方那里被加工成信号(调制) 把信息从信号中抽取出来&am…...
论文浅尝 | 基于判别指令微调生成式大语言模型的知识图谱补全方法(ISWC2024)
笔记整理:刘治强,浙江大学硕士生,研究方向为知识图谱表示学习,大语言模型 论文链接:http://arxiv.org/abs/2407.16127 发表会议:ISWC 2024 1. 动机 传统的知识图谱补全(KGC)模型通过…...
C++.OpenGL (10/64)基础光照(Basic Lighting)
基础光照(Basic Lighting) 冯氏光照模型(Phong Lighting Model) #mermaid-svg-GLdskXwWINxNGHso {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-GLdskXwWINxNGHso .error-icon{fill:#552222;}#mermaid-svg-GLd…...
k8s业务程序联调工具-KtConnect
概述 原理 工具作用是建立了一个从本地到集群的单向VPN,根据VPN原理,打通两个内网必然需要借助一个公共中继节点,ktconnect工具巧妙的利用k8s原生的portforward能力,简化了建立连接的过程,apiserver间接起到了中继节…...
RNN避坑指南:从数学推导到LSTM/GRU工业级部署实战流程
本文较长,建议点赞收藏,以免遗失。更多AI大模型应用开发学习视频及资料,尽在聚客AI学院。 本文全面剖析RNN核心原理,深入讲解梯度消失/爆炸问题,并通过LSTM/GRU结构实现解决方案,提供时间序列预测和文本生成…...
蓝桥杯3498 01串的熵
问题描述 对于一个长度为 23333333的 01 串, 如果其信息熵为 11625907.5798, 且 0 出现次数比 1 少, 那么这个 01 串中 0 出现了多少次? #include<iostream> #include<cmath> using namespace std;int n 23333333;int main() {//枚举 0 出现的次数//因…...
Unity | AmplifyShaderEditor插件基础(第七集:平面波动shader)
目录 一、👋🏻前言 二、😈sinx波动的基本原理 三、😈波动起来 1.sinx节点介绍 2.vertexPosition 3.集成Vector3 a.节点Append b.连起来 4.波动起来 a.波动的原理 b.时间节点 c.sinx的处理 四、🌊波动优化…...
