DataX源码分析 writer
系列文章目录
一、DataX详解和架构介绍
二、DataX源码分析 JobContainer
三、DataX源码分析 TaskGroupContainer
四、DataX源码分析 TaskExecutor
五、DataX源码分析 reader
六、DataX源码分析 writer
七、DataX源码分析 Channel
文章目录
- 系列文章目录
- 前言
- DataX的Writer写入流程
- Writer组件如何处理各类数据源
- writer相关源码
前言
在 DataX 中,writer 是数据同步过程中的一个核心组件,负责将数据写入到目标数据源。下面是对 DataX 中 writer 组件的源码分析:
Writer 接口定义:
DataX 的 writer 组件首先定义了一个 Writer 接口,该接口定义了 writer 需要实现的基本方法,如 init(), write(), post() 等。
不同的数据源插件需要实现这个接口,提供对应的数据写入逻辑。
Writer 插件实现:
对于每种目标数据源,DataX 都会有一个对应的 writer 插件实现。例如,对于 MySQL 数据源,会有一个 MysqlWriter 类实现 Writer 接口。
每个 writer 插件的实现中,会包含与目标数据源交互的逻辑,如建立连接、执行 SQL 语句、批量插入数据等。
Writer 配置:
在 DataX 的 JSON 配置文件中,会指定 writer 的类型和相应的配置参数。
这些配置参数会被传递给 writer 插件的 init() 方法,用于初始化 writer 实例。
数据写入逻辑:
在 write() 方法中,writer 会从上游的 reader 中获取数据,并将其写入到目标数据源。
根据不同的数据源和写入策略,writer 可能会采用批量插入、逐条插入等方式进行数据写入。
writer 还会处理写入过程中的异常和错误,确保数据的完整性和一致性。
Writer 清理和关闭:
在数据写入完成后,writer 会执行 post() 方法,进行一些清理和关闭操作。
这可能包括关闭数据库连接、释放资源等。
通过对 DataX 中 writer 组件的源码分析,我们可以了解到 writer 是如何与目标数据源进行交互的,以及它是如何处理和写入数据的。
DataX的Writer写入流程
- 初始化和准备:
根据配置文件中指定的目标数据源类型和参数,初始化Writer实例。
建立与目标数据源的连接,这通常涉及到网络连接、认证授权等步骤。
准备写入操作所需的各种资源,如缓冲区、事务等。 - 数据接收:
Writer从上游的Reader组件接收数据。这些数据可能是经过转换和处理的,已经符合目标数据源的要求。Writer将数据暂存到本地缓冲区或内存中,等待批量写入或逐条写入。 - 数据格式化和处理:
根据目标数据源的要求,Writer可能需要对接收到的数据进行格式化处理,如将数据转换为特定的文本格式、二进制格式或JSON格式等。 - 数据写入:
Writer将格式化处理后的数据写入目标数据源。写入操作可能涉及到网络通信、数据库操作等。根据目标数据源的特性,Writer会采用批量写入、流式写入等不同的写入方式以提高性能。对于支持事务的数据源,Writer会在每个写入操作前开启一个事务,并在写入完成后提交事务以确保数据的一致性。 - 错误处理和重试:
在写入过程中,Writer需要处理可能出现的各种错误和异常,如网络中断、数据格式错误等。根据配置文件中指定的错误处理策略,Writer可能会进行重试、跳过错误数据、记录错误日志等操作。 - 写入完成和清理:
当所有数据都成功写入目标数据源后,Writer会执行一些清理操作,如关闭数据库连接、释放资源等。Writer还会向上游的Reader或整个DataX任务发送完成信号,以通知整个任务流程已经完成。
Writer组件如何处理各类数据源
不同的数据源具有不同的写入特性和要求,因此Writer组件需要针对不同的数据源实现相应的写入逻辑。以下是一般情况下,DataX Writer组件如何处理各类数据源的大致步骤和考虑因素:
- 数据源连接:
Writer组件首先需要与目标数据源建立连接。这可能涉及到网络通信、认证授权、连接池管理等操作。
根据数据源类型的不同,Writer可能会使用不同的连接协议和库,如JDBC、ODBC、API等。 - 写入前准备:
根据目标数据源的表结构,Writer可能需要创建表、索引或分区。
Writer可能还需要准备写入数据的格式,如文本、二进制、JSON等。
对于支持事务的数据源,Writer可能会开启一个事务来确保数据的一致性。 - 数据写入:
Writer从Reader组件接收数据,并将其写入目标数据源。
根据数据源的特点,Writer可能会采用批量写入、逐条写入、流式写入等不同的写入方式。对于一些支持并行写入的数据源,Writer可能需要将数据分片并分配给多个线程或进程进行并发写入。 - 错误处理:
Writer需要处理写入过程中可能出现的异常和错误,如网络中断、数据格式错误、数据冲突等。
根据不同的错误类型,Writer可能会采取重试、跳过、记录错误日志等不同的处理策略。 - 写入优化:
对于不同的数据源,Writer可能会采用不同的优化策略来提高写入性能,如使用批量插入、调整事务大小、优化网络传输等。
Writer还可能利用目标数据源的特定功能,如批量提交、索引优化等,来进一步提高写入效率。 - 写入后处理:
在数据写入完成后,Writer可能会执行一些后处理操作,如提交事务、关闭连接、清理临时文件等。
对于一些需要额外处理的数据源,Writer可能还会执行数据校验、更新统计信息等操作。 - 扩展性和灵活性:
DataX的Writer组件设计通常具有高度的扩展性和灵活性,以便支持新的数据源类型。通过实现统一的接口和抽象类,可以方便地添加新的Writer插件来支持新的数据源。
总之,DataX的Writer组件通过针对不同数据源实现特定的写入逻辑和优化策略,能够高效地处理各类数据源,并确保数据的正确性和一致性。同时,其扩展性和灵活性的设计也使得DataX能够轻松应对不断变化的数据处理需求。
writer相关源码
/*** 每个Writer插件需要实现Writer类,并在其内部实现Job、Task两个内部类。* * * */
public abstract class Writer extends BaseObject {/*** 每个Writer插件必须实现Job内部类*/public abstract static class Job extends AbstractJobPlugin {/*** 切分任务。<br>* * @param mandatoryNumber* 为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则框架报错!* * */public abstract List<Configuration> split(int mandatoryNumber);}/*** 每个Writer插件必须实现Task内部类*/public abstract static class Task extends AbstractTaskPlugin {public abstract void startWrite(RecordReceiver lineReceiver);public boolean supportFailOver(){return false;}}
}
public class MysqlWriter extends Writer {private static final DataBaseType DATABASE_TYPE = DataBaseType.MySql;public static class Job extends Writer.Job {private Configuration originalConfig = null;private CommonRdbmsWriter.Job commonRdbmsWriterJob;@Overridepublic void preCheck(){this.init();this.commonRdbmsWriterJob.writerPreCheck(this.originalConfig, DATABASE_TYPE);}@Overridepublic void init() {this.originalConfig = super.getPluginJobConf();this.commonRdbmsWriterJob = new CommonRdbmsWriter.Job(DATABASE_TYPE);this.commonRdbmsWriterJob.init(this.originalConfig);}// 一般来说,是需要推迟到 task 中进行pre 的执行(单表情况例外)@Overridepublic void prepare() {//实跑先不支持 权限 检验//this.commonRdbmsWriterJob.privilegeValid(this.originalConfig, DATABASE_TYPE);this.commonRdbmsWriterJob.prepare(this.originalConfig);}@Overridepublic List<Configuration> split(int mandatoryNumber) {return this.commonRdbmsWriterJob.split(this.originalConfig, mandatoryNumber);}// 一般来说,是需要推迟到 task 中进行post 的执行(单表情况例外)@Overridepublic void post() {this.commonRdbmsWriterJob.post(this.originalConfig);}@Overridepublic void destroy() {this.commonRdbmsWriterJob.destroy(this.originalConfig);}}public static class Task extends Writer.Task {private Configuration writerSliceConfig;private CommonRdbmsWriter.Task commonRdbmsWriterTask;@Overridepublic void init() {this.writerSliceConfig = super.getPluginJobConf();this.commonRdbmsWriterTask = new CommonRdbmsWriter.Task(DATABASE_TYPE);this.commonRdbmsWriterTask.init(this.writerSliceConfig);}@Overridepublic void prepare() {this.commonRdbmsWriterTask.prepare(this.writerSliceConfig);}//TODO 改用连接池,确保每次获取的连接都是可用的(注意:连接可能需要每次都初始化其 session)public void startWrite(RecordReceiver recordReceiver) {this.commonRdbmsWriterTask.startWrite(recordReceiver, this.writerSliceConfig,super.getTaskPluginCollector());}@Overridepublic void post() {this.commonRdbmsWriterTask.post(this.writerSliceConfig);}@Overridepublic void destroy() {this.commonRdbmsWriterTask.destroy(this.writerSliceConfig);}@Overridepublic boolean supportFailOver(){String writeMode = writerSliceConfig.getString(Key.WRITE_MODE);return "replace".equalsIgnoreCase(writeMode);}}}
public class RdbmsWriter extends Writer {private static final DataBaseType DATABASE_TYPE = DataBaseType.RDBMS;static {//加载插件下面配置的驱动类DBUtil.loadDriverClass("writer", "rdbms");}public static class Job extends Writer.Job {private Configuration originalConfig = null;private CommonRdbmsWriter.Job commonRdbmsWriterMaster;@Overridepublic void init() {this.originalConfig = super.getPluginJobConf();// warn:not like mysql, only support insert mode, don't useString writeMode = this.originalConfig.getString(Key.WRITE_MODE);if (null != writeMode) {throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,String.format("写入模式(writeMode)配置有误. 因为不支持配置参数项 writeMode: %s, 仅使用insert sql 插入数据. 请检查您的配置并作出修改.",writeMode));}this.commonRdbmsWriterMaster = new SubCommonRdbmsWriter.Job(DATABASE_TYPE);this.commonRdbmsWriterMaster.init(this.originalConfig);}@Overridepublic void prepare() {this.commonRdbmsWriterMaster.prepare(this.originalConfig);}@Overridepublic List<Configuration> split(int mandatoryNumber) {return this.commonRdbmsWriterMaster.split(this.originalConfig,mandatoryNumber);}@Overridepublic void post() {this.commonRdbmsWriterMaster.post(this.originalConfig);}@Overridepublic void destroy() {this.commonRdbmsWriterMaster.destroy(this.originalConfig);}}public static class Task extends Writer.Task {private Configuration writerSliceConfig;private CommonRdbmsWriter.Task commonRdbmsWriterSlave;@Overridepublic void init() {this.writerSliceConfig = super.getPluginJobConf();this.commonRdbmsWriterSlave = new SubCommonRdbmsWriter.Task(DATABASE_TYPE);this.commonRdbmsWriterSlave.init(this.writerSliceConfig);}@Overridepublic void prepare() {this.commonRdbmsWriterSlave.prepare(this.writerSliceConfig);}public void startWrite(RecordReceiver recordReceiver) {this.commonRdbmsWriterSlave.startWrite(recordReceiver,this.writerSliceConfig, super.getTaskPluginCollector());}@Overridepublic void post() {this.commonRdbmsWriterSlave.post(this.writerSliceConfig);}@Overridepublic void destroy() {this.commonRdbmsWriterSlave.destroy(this.writerSliceConfig);}}}
相关文章:
DataX源码分析 writer
系列文章目录 一、DataX详解和架构介绍 二、DataX源码分析 JobContainer 三、DataX源码分析 TaskGroupContainer 四、DataX源码分析 TaskExecutor 五、DataX源码分析 reader 六、DataX源码分析 writer 七、DataX源码分析 Channel 文章目录 系列文章目录前言DataX的Writer写入流…...
为自己的项目媒体资源添加固定高度
为自己的项目媒体资源添加固定高度 未媒体资源添加固定高度,不仅有利于确定懒加载后的切确位置,还可以做骨架屏、loading动画等等,但是因为历史数据中很多没有加高度的媒体资源,所以一直嫌麻烦没有做。 直到这个季度有一个自上而…...

家政小程序系统源码开发:引领智能生活新篇章
随着科技的飞速发展,小程序作为一种便捷的应用形态,已经深入到我们生活的方方面面。尤其在家庭服务领域,家政小程序的出现为人们带来了前所未有的便利。它不仅简化了家政服务的流程,提升了服务质量,还为家政服务行业注…...
多表查询
目录 统计出一张数据表中的数据量 查询 dept 表中的数据量 查询 emp 表中的数据量 实现 emp 与 dept 的多表查询 笛卡尔积 消除笛卡尔积 把数据表 emp 的别名定为 e,数据表 dept 的别名定为 d,然后在查询中分别使用 e 和 d 代替这两个表 Oracle从…...

PHP开发日志 ━━ 深入理解三元操作与一般条件语句的不同
概况 三元运算符的功能与“if…else”流程语句一致。 在一般情况下,三元操作替换if条件语句可以精简代码,并且更为直观,但是在下面的情况中使用三元操作将会返回警告。 借图: 案例 比如原代码: class classA{publ…...

多维时序 | Matlab实现RF-Adaboost随机森林结合Adaboost多变量时间序列预测
多维时序 | Matlab实现RF-Adaboost随机森林结合Adaboost多变量时间序列预测 目录 多维时序 | Matlab实现RF-Adaboost随机森林结合Adaboost多变量时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 1.Matlab实现RF-Adaboost随机森林结合Adaboost多变量时间序列预…...

vue3-内置组件-Suspense
Suspense (实验性功能) <Suspense> 是一项实验性功能。它不一定会最终成为稳定功能,并且在稳定之前相关 API 也可能会发生变化。 <Suspense> 是一个内置组件,用来在组件树中协调对异步依赖的处理。它让我们可以在组件树上层等待下层的多个嵌…...

Rust入门:如何在windows + vscode中关闭程序codelldb.exe
在windows中用vscode单步调试rust程序的时候,发现无论是按下stop键,还是运行完程序,调试器codelldb.exe一直霸占着主程序不退出,如果此时对代码进行修改,后续就没法再编译调试了。 目前我也不知道要怎么处理这个事&am…...
git错误整理
remote: Support for password authentication was removed on August 13, 2021. 参考:这篇即可 GnuTLS recv error (-110): The TLS connection was non-properly terminated. 执行下面的指令: git config --global http.sslVerify false...

跟着cherno手搓游戏引擎【22】CameraController、Resize
前置: YOTO.h: #pragma once//用于YOTO APP#include "YOTO/Application.h" #include"YOTO/Layer.h" #include "YOTO/Log.h"#include"YOTO/Core/Timestep.h"#include"YOTO/Input.h" #include"YOTO/KeyCod…...

微信小程序(四十二)wechat-http拦截器
注释很详细,直接上代码 上一篇 新增内容: 1.wechat-http请求的封装 2.wechat-http请求的拦截器的用法演示 源码: utils/http.js import http from "wechat-http"//设置全局默认请求地址 http.baseURL "https://live-api.ith…...

tomcat部署zrlog
1.下载zrlog包,并添加到虚拟机中 1)进入/opt/apache-tomcat-8.5.90/webapps目录 cd /opt/apache-tomcat-8.5.90/webapps2)下载zrlog包 wget http://dl.zrlog.com/release/zrlog-1.7.1-baaecb9-release.war 3)重命名包 mv zrlog-1.7.1-baaecb9-release zrblog 2…...
Ubuntu Desktop 开机数字小键盘
Ubuntu Desktop 开机数字小键盘 1. 开机数字小键盘References 1. 开机数字小键盘 一般情况下,Ubuntu 开机后小键盘区是控制方向键而非数字键,每次开机后若用到数字键都需要按下 NumLock 键。 References [1] Yongqiang Cheng, https://yongqiang.blog…...

树莓派编程基础与硬件控制
1.编程语言 Python 是一种泛用型的编程语言,可以用于大量场景的程序开发中。根据基于谷歌搜 索指数的 PYPL(程序语言流行指数)统计,Python 是 2019 年 2 月全球范围内最为流行 的编程语言 相比传统的 C、Java 等编程语言&#x…...

autojs通过正则表达式获取带有数字的text内容
视频连接 视频连接 参考 参考 var ctextMatches(/\d/).findOne()console.log("当前金币"c.text()) // 获取当前金币UiSelector.textMatches(reg) reg {string} | {Regex} 要满足的正则表达式。 为当前选择器附加控件"text需要满足正则表达式reg"的条件。 …...
Android java基础_类的继承
一.Android Java基础_类的继承 先封装一个persion类,在persion的基础上定义Student类,并基础persion类。 子类能访问父类的成员函数。 class Person {private int age;public void setAge(int age) {if (age < 0 || age > 200)age 0;else {thi…...
nginx stream proxy 模块的ssl连接源码分析
目录 1. 源起2. 分析验证环境的配置3. 源码分析3.1 代理模块的请求入口点分析3.2 发起与上游服务器的连接3.3 连接回调3.4 TCP连接建立成功后为上下游数据透传做准备3.5 TCP连接的ssl上下文初始化3.6 ssl握手成功后的处理3.7 连接数据的收与发1. 源起 我一直来对ssl建立连接的过…...
C#面:Static Nested Class 和 Inner Class 有什么不同
这是两种不同的类嵌套方式。 Static Nested Class : 是一个静态嵌套类,它是在外部类中定义的一个静态类。它可以访问外部类的静态成员和方法,但不能直接访问外部类的非静态成员和方法。静态嵌套类可以独立于外部类实例化,即可以…...

LeetCode、208. 实现 Trie (前缀树)【中等,自定义数据结构】
文章目录 前言LeetCode、208. 实现 Trie (前缀树)【中等,自定义数据结构】题目链接与分类思路 资料获取 前言 博主介绍:✌目前全网粉丝2W,csdn博客专家、Java领域优质创作者,博客之星、阿里云平台优质作者、专注于Java后端技术领…...

java数据结构与算法刷题-----LeetCode151. 反转字符串中的单词
java数据结构与算法刷题目录(剑指Offer、LeetCode、ACM)-----主目录-----持续更新(进不去说明我没写完):https://blog.csdn.net/grd_java/article/details/123063846 解题思路 这道题,可以理解为,将字符串颠倒…...
<6>-MySQL表的增删查改
目录 一,create(创建表) 二,retrieve(查询表) 1,select列 2,where条件 三,update(更新表) 四,delete(删除表…...
mongodb源码分析session执行handleRequest命令find过程
mongo/transport/service_state_machine.cpp已经分析startSession创建ASIOSession过程,并且验证connection是否超过限制ASIOSession和connection是循环接受客户端命令,把数据流转换成Message,状态转变流程是:State::Created 》 St…...

对WWDC 2025 Keynote 内容的预测
借助我们以往对苹果公司发展路径的深入研究经验,以及大语言模型的分析能力,我们系统梳理了多年来苹果 WWDC 主题演讲的规律。在 WWDC 2025 即将揭幕之际,我们让 ChatGPT 对今年的 Keynote 内容进行了一个初步预测,聊作存档。等到明…...
拉力测试cuda pytorch 把 4070显卡拉满
import torch import timedef stress_test_gpu(matrix_size16384, duration300):"""对GPU进行压力测试,通过持续的矩阵乘法来最大化GPU利用率参数:matrix_size: 矩阵维度大小,增大可提高计算复杂度duration: 测试持续时间(秒&…...
【HTTP三个基础问题】
面试官您好!HTTP是超文本传输协议,是互联网上客户端和服务器之间传输超文本数据(比如文字、图片、音频、视频等)的核心协议,当前互联网应用最广泛的版本是HTTP1.1,它基于经典的C/S模型,也就是客…...

智能分布式爬虫的数据处理流水线优化:基于深度强化学习的数据质量控制
在数字化浪潮席卷全球的今天,数据已成为企业和研究机构的核心资产。智能分布式爬虫作为高效的数据采集工具,在大规模数据获取中发挥着关键作用。然而,传统的数据处理流水线在面对复杂多变的网络环境和海量异构数据时,常出现数据质…...

HashMap中的put方法执行流程(流程图)
1 put操作整体流程 HashMap 的 put 操作是其最核心的功能之一。在 JDK 1.8 及以后版本中,其主要逻辑封装在 putVal 这个内部方法中。整个过程大致如下: 初始判断与哈希计算: 首先,putVal 方法会检查当前的 table(也就…...

网站指纹识别
网站指纹识别 网站的最基本组成:服务器(操作系统)、中间件(web容器)、脚本语言、数据厍 为什么要了解这些?举个例子:发现了一个文件读取漏洞,我们需要读/etc/passwd,如…...

[免费]微信小程序问卷调查系统(SpringBoot后端+Vue管理端)【论文+源码+SQL脚本】
大家好,我是java1234_小锋老师,看到一个不错的微信小程序问卷调查系统(SpringBoot后端Vue管理端)【论文源码SQL脚本】,分享下哈。 项目视频演示 【免费】微信小程序问卷调查系统(SpringBoot后端Vue管理端) Java毕业设计_哔哩哔哩_bilibili 项…...
MinIO Docker 部署:仅开放一个端口
MinIO Docker 部署:仅开放一个端口 在实际的服务器部署中,出于安全和管理的考虑,我们可能只能开放一个端口。MinIO 是一个高性能的对象存储服务,支持 Docker 部署,但默认情况下它需要两个端口:一个是 API 端口(用于存储和访问数据),另一个是控制台端口(用于管理界面…...