当前位置: 首页 > news >正文

DataX源码分析 writer

系列文章目录

一、DataX详解和架构介绍
二、DataX源码分析 JobContainer
三、DataX源码分析 TaskGroupContainer
四、DataX源码分析 TaskExecutor
五、DataX源码分析 reader
六、DataX源码分析 writer
七、DataX源码分析 Channel

文章目录

  • 系列文章目录
  • 前言
  • DataX的Writer写入流程
  • Writer组件如何处理各类数据源
  • writer相关源码


前言

在 DataX 中,writer 是数据同步过程中的一个核心组件,负责将数据写入到目标数据源。下面是对 DataX 中 writer 组件的源码分析:
Writer 接口定义:
DataX 的 writer 组件首先定义了一个 Writer 接口,该接口定义了 writer 需要实现的基本方法,如 init(), write(), post() 等。
不同的数据源插件需要实现这个接口,提供对应的数据写入逻辑。
Writer 插件实现:
对于每种目标数据源,DataX 都会有一个对应的 writer 插件实现。例如,对于 MySQL 数据源,会有一个 MysqlWriter 类实现 Writer 接口。
每个 writer 插件的实现中,会包含与目标数据源交互的逻辑,如建立连接、执行 SQL 语句、批量插入数据等。
Writer 配置:
在 DataX 的 JSON 配置文件中,会指定 writer 的类型和相应的配置参数。
这些配置参数会被传递给 writer 插件的 init() 方法,用于初始化 writer 实例。
数据写入逻辑:
在 write() 方法中,writer 会从上游的 reader 中获取数据,并将其写入到目标数据源。
根据不同的数据源和写入策略,writer 可能会采用批量插入、逐条插入等方式进行数据写入。
writer 还会处理写入过程中的异常和错误,确保数据的完整性和一致性。
Writer 清理和关闭:
在数据写入完成后,writer 会执行 post() 方法,进行一些清理和关闭操作。
这可能包括关闭数据库连接、释放资源等。
通过对 DataX 中 writer 组件的源码分析,我们可以了解到 writer 是如何与目标数据源进行交互的,以及它是如何处理和写入数据的。


DataX的Writer写入流程

  • 初始化和准备:
    根据配置文件中指定的目标数据源类型和参数,初始化Writer实例。
    建立与目标数据源的连接,这通常涉及到网络连接、认证授权等步骤。
    准备写入操作所需的各种资源,如缓冲区、事务等。
  • 数据接收:
    Writer从上游的Reader组件接收数据。这些数据可能是经过转换和处理的,已经符合目标数据源的要求。Writer将数据暂存到本地缓冲区或内存中,等待批量写入或逐条写入。
  • 数据格式化和处理:
    根据目标数据源的要求,Writer可能需要对接收到的数据进行格式化处理,如将数据转换为特定的文本格式、二进制格式或JSON格式等。
  • 数据写入:
    Writer将格式化处理后的数据写入目标数据源。写入操作可能涉及到网络通信、数据库操作等。根据目标数据源的特性,Writer会采用批量写入、流式写入等不同的写入方式以提高性能。对于支持事务的数据源,Writer会在每个写入操作前开启一个事务,并在写入完成后提交事务以确保数据的一致性。
  • 错误处理和重试:
    在写入过程中,Writer需要处理可能出现的各种错误和异常,如网络中断、数据格式错误等。根据配置文件中指定的错误处理策略,Writer可能会进行重试、跳过错误数据、记录错误日志等操作。
  • 写入完成和清理:
    当所有数据都成功写入目标数据源后,Writer会执行一些清理操作,如关闭数据库连接、释放资源等。Writer还会向上游的Reader或整个DataX任务发送完成信号,以通知整个任务流程已经完成。

Writer组件如何处理各类数据源

不同的数据源具有不同的写入特性和要求,因此Writer组件需要针对不同的数据源实现相应的写入逻辑。以下是一般情况下,DataX Writer组件如何处理各类数据源的大致步骤和考虑因素:

  • 数据源连接:
    Writer组件首先需要与目标数据源建立连接。这可能涉及到网络通信、认证授权、连接池管理等操作。
    根据数据源类型的不同,Writer可能会使用不同的连接协议和库,如JDBC、ODBC、API等。
  • 写入前准备:
    根据目标数据源的表结构,Writer可能需要创建表、索引或分区。
    Writer可能还需要准备写入数据的格式,如文本、二进制、JSON等。
    对于支持事务的数据源,Writer可能会开启一个事务来确保数据的一致性。
  • 数据写入:
    Writer从Reader组件接收数据,并将其写入目标数据源。
    根据数据源的特点,Writer可能会采用批量写入、逐条写入、流式写入等不同的写入方式。对于一些支持并行写入的数据源,Writer可能需要将数据分片并分配给多个线程或进程进行并发写入。
  • 错误处理:
    Writer需要处理写入过程中可能出现的异常和错误,如网络中断、数据格式错误、数据冲突等。
    根据不同的错误类型,Writer可能会采取重试、跳过、记录错误日志等不同的处理策略。
  • 写入优化:
    对于不同的数据源,Writer可能会采用不同的优化策略来提高写入性能,如使用批量插入、调整事务大小、优化网络传输等。
    Writer还可能利用目标数据源的特定功能,如批量提交、索引优化等,来进一步提高写入效率。
  • 写入后处理:
    在数据写入完成后,Writer可能会执行一些后处理操作,如提交事务、关闭连接、清理临时文件等。
    对于一些需要额外处理的数据源,Writer可能还会执行数据校验、更新统计信息等操作。
  • 扩展性和灵活性:
    DataX的Writer组件设计通常具有高度的扩展性和灵活性,以便支持新的数据源类型。通过实现统一的接口和抽象类,可以方便地添加新的Writer插件来支持新的数据源。

总之,DataX的Writer组件通过针对不同数据源实现特定的写入逻辑和优化策略,能够高效地处理各类数据源,并确保数据的正确性和一致性。同时,其扩展性和灵活性的设计也使得DataX能够轻松应对不断变化的数据处理需求。

writer相关源码


/*** 每个Writer插件需要实现Writer类,并在其内部实现Job、Task两个内部类。* * * */
public abstract class Writer extends BaseObject {/*** 每个Writer插件必须实现Job内部类*/public abstract static class Job extends AbstractJobPlugin {/*** 切分任务。<br>* * @param mandatoryNumber*            为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则框架报错!* * */public abstract List<Configuration> split(int mandatoryNumber);}/*** 每个Writer插件必须实现Task内部类*/public abstract static class Task extends AbstractTaskPlugin {public abstract void startWrite(RecordReceiver lineReceiver);public boolean supportFailOver(){return false;}}
}
public class MysqlWriter extends Writer {private static final DataBaseType DATABASE_TYPE = DataBaseType.MySql;public static class Job extends Writer.Job {private Configuration originalConfig = null;private CommonRdbmsWriter.Job commonRdbmsWriterJob;@Overridepublic void preCheck(){this.init();this.commonRdbmsWriterJob.writerPreCheck(this.originalConfig, DATABASE_TYPE);}@Overridepublic void init() {this.originalConfig = super.getPluginJobConf();this.commonRdbmsWriterJob = new CommonRdbmsWriter.Job(DATABASE_TYPE);this.commonRdbmsWriterJob.init(this.originalConfig);}// 一般来说,是需要推迟到 task 中进行pre 的执行(单表情况例外)@Overridepublic void prepare() {//实跑先不支持 权限 检验//this.commonRdbmsWriterJob.privilegeValid(this.originalConfig, DATABASE_TYPE);this.commonRdbmsWriterJob.prepare(this.originalConfig);}@Overridepublic List<Configuration> split(int mandatoryNumber) {return this.commonRdbmsWriterJob.split(this.originalConfig, mandatoryNumber);}// 一般来说,是需要推迟到 task 中进行post 的执行(单表情况例外)@Overridepublic void post() {this.commonRdbmsWriterJob.post(this.originalConfig);}@Overridepublic void destroy() {this.commonRdbmsWriterJob.destroy(this.originalConfig);}}public static class Task extends Writer.Task {private Configuration writerSliceConfig;private CommonRdbmsWriter.Task commonRdbmsWriterTask;@Overridepublic void init() {this.writerSliceConfig = super.getPluginJobConf();this.commonRdbmsWriterTask = new CommonRdbmsWriter.Task(DATABASE_TYPE);this.commonRdbmsWriterTask.init(this.writerSliceConfig);}@Overridepublic void prepare() {this.commonRdbmsWriterTask.prepare(this.writerSliceConfig);}//TODO 改用连接池,确保每次获取的连接都是可用的(注意:连接可能需要每次都初始化其 session)public void startWrite(RecordReceiver recordReceiver) {this.commonRdbmsWriterTask.startWrite(recordReceiver, this.writerSliceConfig,super.getTaskPluginCollector());}@Overridepublic void post() {this.commonRdbmsWriterTask.post(this.writerSliceConfig);}@Overridepublic void destroy() {this.commonRdbmsWriterTask.destroy(this.writerSliceConfig);}@Overridepublic boolean supportFailOver(){String writeMode = writerSliceConfig.getString(Key.WRITE_MODE);return "replace".equalsIgnoreCase(writeMode);}}}

public class RdbmsWriter extends Writer {private static final DataBaseType DATABASE_TYPE = DataBaseType.RDBMS;static {//加载插件下面配置的驱动类DBUtil.loadDriverClass("writer", "rdbms");}public static class Job extends Writer.Job {private Configuration originalConfig = null;private CommonRdbmsWriter.Job commonRdbmsWriterMaster;@Overridepublic void init() {this.originalConfig = super.getPluginJobConf();// warn:not like mysql, only support insert mode, don't useString writeMode = this.originalConfig.getString(Key.WRITE_MODE);if (null != writeMode) {throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,String.format("写入模式(writeMode)配置有误. 因为不支持配置参数项 writeMode: %s, 仅使用insert sql 插入数据. 请检查您的配置并作出修改.",writeMode));}this.commonRdbmsWriterMaster = new SubCommonRdbmsWriter.Job(DATABASE_TYPE);this.commonRdbmsWriterMaster.init(this.originalConfig);}@Overridepublic void prepare() {this.commonRdbmsWriterMaster.prepare(this.originalConfig);}@Overridepublic List<Configuration> split(int mandatoryNumber) {return this.commonRdbmsWriterMaster.split(this.originalConfig,mandatoryNumber);}@Overridepublic void post() {this.commonRdbmsWriterMaster.post(this.originalConfig);}@Overridepublic void destroy() {this.commonRdbmsWriterMaster.destroy(this.originalConfig);}}public static class Task extends Writer.Task {private Configuration writerSliceConfig;private CommonRdbmsWriter.Task commonRdbmsWriterSlave;@Overridepublic void init() {this.writerSliceConfig = super.getPluginJobConf();this.commonRdbmsWriterSlave = new SubCommonRdbmsWriter.Task(DATABASE_TYPE);this.commonRdbmsWriterSlave.init(this.writerSliceConfig);}@Overridepublic void prepare() {this.commonRdbmsWriterSlave.prepare(this.writerSliceConfig);}public void startWrite(RecordReceiver recordReceiver) {this.commonRdbmsWriterSlave.startWrite(recordReceiver,this.writerSliceConfig, super.getTaskPluginCollector());}@Overridepublic void post() {this.commonRdbmsWriterSlave.post(this.writerSliceConfig);}@Overridepublic void destroy() {this.commonRdbmsWriterSlave.destroy(this.writerSliceConfig);}}}

相关文章:

DataX源码分析 writer

系列文章目录 一、DataX详解和架构介绍 二、DataX源码分析 JobContainer 三、DataX源码分析 TaskGroupContainer 四、DataX源码分析 TaskExecutor 五、DataX源码分析 reader 六、DataX源码分析 writer 七、DataX源码分析 Channel 文章目录 系列文章目录前言DataX的Writer写入流…...

为自己的项目媒体资源添加固定高度

为自己的项目媒体资源添加固定高度 未媒体资源添加固定高度&#xff0c;不仅有利于确定懒加载后的切确位置&#xff0c;还可以做骨架屏、loading动画等等&#xff0c;但是因为历史数据中很多没有加高度的媒体资源&#xff0c;所以一直嫌麻烦没有做。 直到这个季度有一个自上而…...

家政小程序系统源码开发:引领智能生活新篇章

随着科技的飞速发展&#xff0c;小程序作为一种便捷的应用形态&#xff0c;已经深入到我们生活的方方面面。尤其在家庭服务领域&#xff0c;家政小程序的出现为人们带来了前所未有的便利。它不仅简化了家政服务的流程&#xff0c;提升了服务质量&#xff0c;还为家政服务行业注…...

多表查询

目录 统计出一张数据表中的数据量 查询 dept 表中的数据量 查询 emp 表中的数据量 实现 emp 与 dept 的多表查询 笛卡尔积 消除笛卡尔积 把数据表 emp 的别名定为 e&#xff0c;数据表 dept 的别名定为 d&#xff0c;然后在查询中分别使用 e 和 d 代替这两个表 Oracle从…...

PHP开发日志 ━━ 深入理解三元操作与一般条件语句的不同

概况 三元运算符的功能与“if…else”流程语句一致。 在一般情况下&#xff0c;三元操作替换if条件语句可以精简代码&#xff0c;并且更为直观&#xff0c;但是在下面的情况中使用三元操作将会返回警告。 借图&#xff1a; 案例 比如原代码&#xff1a; class classA{publ…...

多维时序 | Matlab实现RF-Adaboost随机森林结合Adaboost多变量时间序列预测

多维时序 | Matlab实现RF-Adaboost随机森林结合Adaboost多变量时间序列预测 目录 多维时序 | Matlab实现RF-Adaboost随机森林结合Adaboost多变量时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 1.Matlab实现RF-Adaboost随机森林结合Adaboost多变量时间序列预…...

vue3-内置组件-Suspense

Suspense (实验性功能) <Suspense> 是一项实验性功能。它不一定会最终成为稳定功能&#xff0c;并且在稳定之前相关 API 也可能会发生变化。 <Suspense> 是一个内置组件&#xff0c;用来在组件树中协调对异步依赖的处理。它让我们可以在组件树上层等待下层的多个嵌…...

Rust入门:如何在windows + vscode中关闭程序codelldb.exe

在windows中用vscode单步调试rust程序的时候&#xff0c;发现无论是按下stop键&#xff0c;还是运行完程序&#xff0c;调试器codelldb.exe一直霸占着主程序不退出&#xff0c;如果此时对代码进行修改&#xff0c;后续就没法再编译调试了。 目前我也不知道要怎么处理这个事&am…...

git错误整理

remote: Support for password authentication was removed on August 13, 2021. 参考&#xff1a;这篇即可 GnuTLS recv error (-110): The TLS connection was non-properly terminated. 执行下面的指令&#xff1a; git config --global http.sslVerify false...

跟着cherno手搓游戏引擎【22】CameraController、Resize

前置&#xff1a; YOTO.h: #pragma once//用于YOTO APP#include "YOTO/Application.h" #include"YOTO/Layer.h" #include "YOTO/Log.h"#include"YOTO/Core/Timestep.h"#include"YOTO/Input.h" #include"YOTO/KeyCod…...

微信小程序(四十二)wechat-http拦截器

注释很详细&#xff0c;直接上代码 上一篇 新增内容&#xff1a; 1.wechat-http请求的封装 2.wechat-http请求的拦截器的用法演示 源码&#xff1a; utils/http.js import http from "wechat-http"//设置全局默认请求地址 http.baseURL "https://live-api.ith…...

tomcat部署zrlog

1.下载zrlog包&#xff0c;并添加到虚拟机中 1)进入/opt/apache-tomcat-8.5.90/webapps目录 cd /opt/apache-tomcat-8.5.90/webapps2)下载zrlog包 wget http://dl.zrlog.com/release/zrlog-1.7.1-baaecb9-release.war 3)重命名包 mv zrlog-1.7.1-baaecb9-release zrblog 2…...

Ubuntu Desktop 开机数字小键盘

Ubuntu Desktop 开机数字小键盘 1. 开机数字小键盘References 1. 开机数字小键盘 一般情况下&#xff0c;Ubuntu 开机后小键盘区是控制方向键而非数字键&#xff0c;每次开机后若用到数字键都需要按下 NumLock 键。 References [1] Yongqiang Cheng, https://yongqiang.blog…...

树莓派编程基础与硬件控制

1.编程语言 Python 是一种泛用型的编程语言&#xff0c;可以用于大量场景的程序开发中。根据基于谷歌搜 索指数的 PYPL&#xff08;程序语言流行指数&#xff09;统计&#xff0c;Python 是 2019 年 2 月全球范围内最为流行 的编程语言 相比传统的 C、Java 等编程语言&#x…...

autojs通过正则表达式获取带有数字的text内容

视频连接 视频连接 参考 参考 var ctextMatches(/\d/).findOne()console.log("当前金币"c.text()) // 获取当前金币UiSelector.textMatches(reg) reg {string} | {Regex} 要满足的正则表达式。 为当前选择器附加控件"text需要满足正则表达式reg"的条件。 …...

Android java基础_类的继承

一.Android Java基础_类的继承 先封装一个persion类&#xff0c;在persion的基础上定义Student类&#xff0c;并基础persion类。 子类能访问父类的成员函数。 class Person {private int age;public void setAge(int age) {if (age < 0 || age > 200)age 0;else {thi…...

nginx stream proxy 模块的ssl连接源码分析

目录 1. 源起2. 分析验证环境的配置3. 源码分析3.1 代理模块的请求入口点分析3.2 发起与上游服务器的连接3.3 连接回调3.4 TCP连接建立成功后为上下游数据透传做准备3.5 TCP连接的ssl上下文初始化3.6 ssl握手成功后的处理3.7 连接数据的收与发1. 源起 我一直来对ssl建立连接的过…...

C#面:Static Nested Class 和 Inner Class 有什么不同

这是两种不同的类嵌套方式。 Static Nested Class &#xff1a; 是一个静态嵌套类&#xff0c;它是在外部类中定义的一个静态类。它可以访问外部类的静态成员和方法&#xff0c;但不能直接访问外部类的非静态成员和方法。静态嵌套类可以独立于外部类实例化&#xff0c;即可以…...

LeetCode、208. 实现 Trie (前缀树)【中等,自定义数据结构】

文章目录 前言LeetCode、208. 实现 Trie (前缀树)【中等&#xff0c;自定义数据结构】题目链接与分类思路 资料获取 前言 博主介绍&#xff1a;✌目前全网粉丝2W&#xff0c;csdn博客专家、Java领域优质创作者&#xff0c;博客之星、阿里云平台优质作者、专注于Java后端技术领…...

java数据结构与算法刷题-----LeetCode151. 反转字符串中的单词

java数据结构与算法刷题目录&#xff08;剑指Offer、LeetCode、ACM&#xff09;-----主目录-----持续更新(进不去说明我没写完)&#xff1a;https://blog.csdn.net/grd_java/article/details/123063846 解题思路 这道题&#xff0c;可以理解为&#xff0c;将字符串颠倒&#xf…...

Vue记事本应用实现教程

文章目录 1. 项目介绍2. 开发环境准备3. 设计应用界面4. 创建Vue实例和数据模型5. 实现记事本功能5.1 添加新记事项5.2 删除记事项5.3 清空所有记事 6. 添加样式7. 功能扩展&#xff1a;显示创建时间8. 功能扩展&#xff1a;记事项搜索9. 完整代码10. Vue知识点解析10.1 数据绑…...

golang循环变量捕获问题​​

在 Go 语言中&#xff0c;当在循环中启动协程&#xff08;goroutine&#xff09;时&#xff0c;如果在协程闭包中直接引用循环变量&#xff0c;可能会遇到一个常见的陷阱 - ​​循环变量捕获问题​​。让我详细解释一下&#xff1a; 问题背景 看这个代码片段&#xff1a; fo…...

2021-03-15 iview一些问题

1.iview 在使用tree组件时&#xff0c;发现没有set类的方法&#xff0c;只有get&#xff0c;那么要改变tree值&#xff0c;只能遍历treeData&#xff0c;递归修改treeData的checked&#xff0c;发现无法更改&#xff0c;原因在于check模式下&#xff0c;子元素的勾选状态跟父节…...

Springcloud:Eureka 高可用集群搭建实战(服务注册与发现的底层原理与避坑指南)

引言&#xff1a;为什么 Eureka 依然是存量系统的核心&#xff1f; 尽管 Nacos 等新注册中心崛起&#xff0c;但金融、电力等保守行业仍有大量系统运行在 Eureka 上。理解其高可用设计与自我保护机制&#xff0c;是保障分布式系统稳定的必修课。本文将手把手带你搭建生产级 Eur…...

Unity | AmplifyShaderEditor插件基础(第七集:平面波动shader)

目录 一、&#x1f44b;&#x1f3fb;前言 二、&#x1f608;sinx波动的基本原理 三、&#x1f608;波动起来 1.sinx节点介绍 2.vertexPosition 3.集成Vector3 a.节点Append b.连起来 4.波动起来 a.波动的原理 b.时间节点 c.sinx的处理 四、&#x1f30a;波动优化…...

push [特殊字符] present

push &#x1f19a; present 前言present和dismiss特点代码演示 push和pop特点代码演示 前言 在 iOS 开发中&#xff0c;push 和 present 是两种不同的视图控制器切换方式&#xff0c;它们有着显著的区别。 present和dismiss 特点 在当前控制器上方新建视图层级需要手动调用…...

Qt 事件处理中 return 的深入解析

Qt 事件处理中 return 的深入解析 在 Qt 事件处理中&#xff0c;return 语句的使用是另一个关键概念&#xff0c;它与 event->accept()/event->ignore() 密切相关但作用不同。让我们详细分析一下它们之间的关系和工作原理。 核心区别&#xff1a;不同层级的事件处理 方…...

算术操作符与类型转换:从基础到精通

目录 前言&#xff1a;从基础到实践——探索运算符与类型转换的奥秘 算术操作符超级详解 算术操作符&#xff1a;、-、*、/、% 赋值操作符&#xff1a;和复合赋值 单⽬操作符&#xff1a;、--、、- 前言&#xff1a;从基础到实践——探索运算符与类型转换的奥秘 在先前的文…...

C# winform教程(二)----checkbox

一、作用 提供一个用户选择或者不选的状态&#xff0c;这是一个可以多选的控件。 二、属性 其实功能大差不差&#xff0c;除了特殊的几个外&#xff0c;与button基本相同&#xff0c;所有说几个独有的 checkbox属性 名称内容含义appearance控件外观可以变成按钮形状checkali…...

Spring是如何实现无代理对象的循环依赖

无代理对象的循环依赖 什么是循环依赖解决方案实现方式测试验证 引入代理对象的影响创建代理对象问题分析 源码见&#xff1a;mini-spring 什么是循环依赖 循环依赖是指在对象创建过程中&#xff0c;两个或多个对象相互依赖&#xff0c;导致创建过程陷入死循环。以下通过一个简…...