DataX源码分析 writer
系列文章目录
一、DataX详解和架构介绍
二、DataX源码分析 JobContainer
三、DataX源码分析 TaskGroupContainer
四、DataX源码分析 TaskExecutor
五、DataX源码分析 reader
六、DataX源码分析 writer
七、DataX源码分析 Channel
文章目录
- 系列文章目录
- 前言
- DataX的Writer写入流程
- Writer组件如何处理各类数据源
- writer相关源码
前言
在 DataX 中,writer 是数据同步过程中的一个核心组件,负责将数据写入到目标数据源。下面是对 DataX 中 writer 组件的源码分析:
Writer 接口定义:
DataX 的 writer 组件首先定义了一个 Writer 接口,该接口定义了 writer 需要实现的基本方法,如 init(), write(), post() 等。
不同的数据源插件需要实现这个接口,提供对应的数据写入逻辑。
Writer 插件实现:
对于每种目标数据源,DataX 都会有一个对应的 writer 插件实现。例如,对于 MySQL 数据源,会有一个 MysqlWriter 类实现 Writer 接口。
每个 writer 插件的实现中,会包含与目标数据源交互的逻辑,如建立连接、执行 SQL 语句、批量插入数据等。
Writer 配置:
在 DataX 的 JSON 配置文件中,会指定 writer 的类型和相应的配置参数。
这些配置参数会被传递给 writer 插件的 init() 方法,用于初始化 writer 实例。
数据写入逻辑:
在 write() 方法中,writer 会从上游的 reader 中获取数据,并将其写入到目标数据源。
根据不同的数据源和写入策略,writer 可能会采用批量插入、逐条插入等方式进行数据写入。
writer 还会处理写入过程中的异常和错误,确保数据的完整性和一致性。
Writer 清理和关闭:
在数据写入完成后,writer 会执行 post() 方法,进行一些清理和关闭操作。
这可能包括关闭数据库连接、释放资源等。
通过对 DataX 中 writer 组件的源码分析,我们可以了解到 writer 是如何与目标数据源进行交互的,以及它是如何处理和写入数据的。
DataX的Writer写入流程
- 初始化和准备:
根据配置文件中指定的目标数据源类型和参数,初始化Writer实例。
建立与目标数据源的连接,这通常涉及到网络连接、认证授权等步骤。
准备写入操作所需的各种资源,如缓冲区、事务等。 - 数据接收:
Writer从上游的Reader组件接收数据。这些数据可能是经过转换和处理的,已经符合目标数据源的要求。Writer将数据暂存到本地缓冲区或内存中,等待批量写入或逐条写入。 - 数据格式化和处理:
根据目标数据源的要求,Writer可能需要对接收到的数据进行格式化处理,如将数据转换为特定的文本格式、二进制格式或JSON格式等。 - 数据写入:
Writer将格式化处理后的数据写入目标数据源。写入操作可能涉及到网络通信、数据库操作等。根据目标数据源的特性,Writer会采用批量写入、流式写入等不同的写入方式以提高性能。对于支持事务的数据源,Writer会在每个写入操作前开启一个事务,并在写入完成后提交事务以确保数据的一致性。 - 错误处理和重试:
在写入过程中,Writer需要处理可能出现的各种错误和异常,如网络中断、数据格式错误等。根据配置文件中指定的错误处理策略,Writer可能会进行重试、跳过错误数据、记录错误日志等操作。 - 写入完成和清理:
当所有数据都成功写入目标数据源后,Writer会执行一些清理操作,如关闭数据库连接、释放资源等。Writer还会向上游的Reader或整个DataX任务发送完成信号,以通知整个任务流程已经完成。
Writer组件如何处理各类数据源
不同的数据源具有不同的写入特性和要求,因此Writer组件需要针对不同的数据源实现相应的写入逻辑。以下是一般情况下,DataX Writer组件如何处理各类数据源的大致步骤和考虑因素:
- 数据源连接:
Writer组件首先需要与目标数据源建立连接。这可能涉及到网络通信、认证授权、连接池管理等操作。
根据数据源类型的不同,Writer可能会使用不同的连接协议和库,如JDBC、ODBC、API等。 - 写入前准备:
根据目标数据源的表结构,Writer可能需要创建表、索引或分区。
Writer可能还需要准备写入数据的格式,如文本、二进制、JSON等。
对于支持事务的数据源,Writer可能会开启一个事务来确保数据的一致性。 - 数据写入:
Writer从Reader组件接收数据,并将其写入目标数据源。
根据数据源的特点,Writer可能会采用批量写入、逐条写入、流式写入等不同的写入方式。对于一些支持并行写入的数据源,Writer可能需要将数据分片并分配给多个线程或进程进行并发写入。 - 错误处理:
Writer需要处理写入过程中可能出现的异常和错误,如网络中断、数据格式错误、数据冲突等。
根据不同的错误类型,Writer可能会采取重试、跳过、记录错误日志等不同的处理策略。 - 写入优化:
对于不同的数据源,Writer可能会采用不同的优化策略来提高写入性能,如使用批量插入、调整事务大小、优化网络传输等。
Writer还可能利用目标数据源的特定功能,如批量提交、索引优化等,来进一步提高写入效率。 - 写入后处理:
在数据写入完成后,Writer可能会执行一些后处理操作,如提交事务、关闭连接、清理临时文件等。
对于一些需要额外处理的数据源,Writer可能还会执行数据校验、更新统计信息等操作。 - 扩展性和灵活性:
DataX的Writer组件设计通常具有高度的扩展性和灵活性,以便支持新的数据源类型。通过实现统一的接口和抽象类,可以方便地添加新的Writer插件来支持新的数据源。
总之,DataX的Writer组件通过针对不同数据源实现特定的写入逻辑和优化策略,能够高效地处理各类数据源,并确保数据的正确性和一致性。同时,其扩展性和灵活性的设计也使得DataX能够轻松应对不断变化的数据处理需求。
writer相关源码
/*** 每个Writer插件需要实现Writer类,并在其内部实现Job、Task两个内部类。* * * */
public abstract class Writer extends BaseObject {/*** 每个Writer插件必须实现Job内部类*/public abstract static class Job extends AbstractJobPlugin {/*** 切分任务。<br>* * @param mandatoryNumber* 为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则框架报错!* * */public abstract List<Configuration> split(int mandatoryNumber);}/*** 每个Writer插件必须实现Task内部类*/public abstract static class Task extends AbstractTaskPlugin {public abstract void startWrite(RecordReceiver lineReceiver);public boolean supportFailOver(){return false;}}
}
public class MysqlWriter extends Writer {private static final DataBaseType DATABASE_TYPE = DataBaseType.MySql;public static class Job extends Writer.Job {private Configuration originalConfig = null;private CommonRdbmsWriter.Job commonRdbmsWriterJob;@Overridepublic void preCheck(){this.init();this.commonRdbmsWriterJob.writerPreCheck(this.originalConfig, DATABASE_TYPE);}@Overridepublic void init() {this.originalConfig = super.getPluginJobConf();this.commonRdbmsWriterJob = new CommonRdbmsWriter.Job(DATABASE_TYPE);this.commonRdbmsWriterJob.init(this.originalConfig);}// 一般来说,是需要推迟到 task 中进行pre 的执行(单表情况例外)@Overridepublic void prepare() {//实跑先不支持 权限 检验//this.commonRdbmsWriterJob.privilegeValid(this.originalConfig, DATABASE_TYPE);this.commonRdbmsWriterJob.prepare(this.originalConfig);}@Overridepublic List<Configuration> split(int mandatoryNumber) {return this.commonRdbmsWriterJob.split(this.originalConfig, mandatoryNumber);}// 一般来说,是需要推迟到 task 中进行post 的执行(单表情况例外)@Overridepublic void post() {this.commonRdbmsWriterJob.post(this.originalConfig);}@Overridepublic void destroy() {this.commonRdbmsWriterJob.destroy(this.originalConfig);}}public static class Task extends Writer.Task {private Configuration writerSliceConfig;private CommonRdbmsWriter.Task commonRdbmsWriterTask;@Overridepublic void init() {this.writerSliceConfig = super.getPluginJobConf();this.commonRdbmsWriterTask = new CommonRdbmsWriter.Task(DATABASE_TYPE);this.commonRdbmsWriterTask.init(this.writerSliceConfig);}@Overridepublic void prepare() {this.commonRdbmsWriterTask.prepare(this.writerSliceConfig);}//TODO 改用连接池,确保每次获取的连接都是可用的(注意:连接可能需要每次都初始化其 session)public void startWrite(RecordReceiver recordReceiver) {this.commonRdbmsWriterTask.startWrite(recordReceiver, this.writerSliceConfig,super.getTaskPluginCollector());}@Overridepublic void post() {this.commonRdbmsWriterTask.post(this.writerSliceConfig);}@Overridepublic void destroy() {this.commonRdbmsWriterTask.destroy(this.writerSliceConfig);}@Overridepublic boolean supportFailOver(){String writeMode = writerSliceConfig.getString(Key.WRITE_MODE);return "replace".equalsIgnoreCase(writeMode);}}}
public class RdbmsWriter extends Writer {private static final DataBaseType DATABASE_TYPE = DataBaseType.RDBMS;static {//加载插件下面配置的驱动类DBUtil.loadDriverClass("writer", "rdbms");}public static class Job extends Writer.Job {private Configuration originalConfig = null;private CommonRdbmsWriter.Job commonRdbmsWriterMaster;@Overridepublic void init() {this.originalConfig = super.getPluginJobConf();// warn:not like mysql, only support insert mode, don't useString writeMode = this.originalConfig.getString(Key.WRITE_MODE);if (null != writeMode) {throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,String.format("写入模式(writeMode)配置有误. 因为不支持配置参数项 writeMode: %s, 仅使用insert sql 插入数据. 请检查您的配置并作出修改.",writeMode));}this.commonRdbmsWriterMaster = new SubCommonRdbmsWriter.Job(DATABASE_TYPE);this.commonRdbmsWriterMaster.init(this.originalConfig);}@Overridepublic void prepare() {this.commonRdbmsWriterMaster.prepare(this.originalConfig);}@Overridepublic List<Configuration> split(int mandatoryNumber) {return this.commonRdbmsWriterMaster.split(this.originalConfig,mandatoryNumber);}@Overridepublic void post() {this.commonRdbmsWriterMaster.post(this.originalConfig);}@Overridepublic void destroy() {this.commonRdbmsWriterMaster.destroy(this.originalConfig);}}public static class Task extends Writer.Task {private Configuration writerSliceConfig;private CommonRdbmsWriter.Task commonRdbmsWriterSlave;@Overridepublic void init() {this.writerSliceConfig = super.getPluginJobConf();this.commonRdbmsWriterSlave = new SubCommonRdbmsWriter.Task(DATABASE_TYPE);this.commonRdbmsWriterSlave.init(this.writerSliceConfig);}@Overridepublic void prepare() {this.commonRdbmsWriterSlave.prepare(this.writerSliceConfig);}public void startWrite(RecordReceiver recordReceiver) {this.commonRdbmsWriterSlave.startWrite(recordReceiver,this.writerSliceConfig, super.getTaskPluginCollector());}@Overridepublic void post() {this.commonRdbmsWriterSlave.post(this.writerSliceConfig);}@Overridepublic void destroy() {this.commonRdbmsWriterSlave.destroy(this.writerSliceConfig);}}}
相关文章:
DataX源码分析 writer
系列文章目录 一、DataX详解和架构介绍 二、DataX源码分析 JobContainer 三、DataX源码分析 TaskGroupContainer 四、DataX源码分析 TaskExecutor 五、DataX源码分析 reader 六、DataX源码分析 writer 七、DataX源码分析 Channel 文章目录 系列文章目录前言DataX的Writer写入流…...
为自己的项目媒体资源添加固定高度
为自己的项目媒体资源添加固定高度 未媒体资源添加固定高度,不仅有利于确定懒加载后的切确位置,还可以做骨架屏、loading动画等等,但是因为历史数据中很多没有加高度的媒体资源,所以一直嫌麻烦没有做。 直到这个季度有一个自上而…...

家政小程序系统源码开发:引领智能生活新篇章
随着科技的飞速发展,小程序作为一种便捷的应用形态,已经深入到我们生活的方方面面。尤其在家庭服务领域,家政小程序的出现为人们带来了前所未有的便利。它不仅简化了家政服务的流程,提升了服务质量,还为家政服务行业注…...
多表查询
目录 统计出一张数据表中的数据量 查询 dept 表中的数据量 查询 emp 表中的数据量 实现 emp 与 dept 的多表查询 笛卡尔积 消除笛卡尔积 把数据表 emp 的别名定为 e,数据表 dept 的别名定为 d,然后在查询中分别使用 e 和 d 代替这两个表 Oracle从…...

PHP开发日志 ━━ 深入理解三元操作与一般条件语句的不同
概况 三元运算符的功能与“if…else”流程语句一致。 在一般情况下,三元操作替换if条件语句可以精简代码,并且更为直观,但是在下面的情况中使用三元操作将会返回警告。 借图: 案例 比如原代码: class classA{publ…...

多维时序 | Matlab实现RF-Adaboost随机森林结合Adaboost多变量时间序列预测
多维时序 | Matlab实现RF-Adaboost随机森林结合Adaboost多变量时间序列预测 目录 多维时序 | Matlab实现RF-Adaboost随机森林结合Adaboost多变量时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 1.Matlab实现RF-Adaboost随机森林结合Adaboost多变量时间序列预…...

vue3-内置组件-Suspense
Suspense (实验性功能) <Suspense> 是一项实验性功能。它不一定会最终成为稳定功能,并且在稳定之前相关 API 也可能会发生变化。 <Suspense> 是一个内置组件,用来在组件树中协调对异步依赖的处理。它让我们可以在组件树上层等待下层的多个嵌…...

Rust入门:如何在windows + vscode中关闭程序codelldb.exe
在windows中用vscode单步调试rust程序的时候,发现无论是按下stop键,还是运行完程序,调试器codelldb.exe一直霸占着主程序不退出,如果此时对代码进行修改,后续就没法再编译调试了。 目前我也不知道要怎么处理这个事&am…...
git错误整理
remote: Support for password authentication was removed on August 13, 2021. 参考:这篇即可 GnuTLS recv error (-110): The TLS connection was non-properly terminated. 执行下面的指令: git config --global http.sslVerify false...

跟着cherno手搓游戏引擎【22】CameraController、Resize
前置: YOTO.h: #pragma once//用于YOTO APP#include "YOTO/Application.h" #include"YOTO/Layer.h" #include "YOTO/Log.h"#include"YOTO/Core/Timestep.h"#include"YOTO/Input.h" #include"YOTO/KeyCod…...

微信小程序(四十二)wechat-http拦截器
注释很详细,直接上代码 上一篇 新增内容: 1.wechat-http请求的封装 2.wechat-http请求的拦截器的用法演示 源码: utils/http.js import http from "wechat-http"//设置全局默认请求地址 http.baseURL "https://live-api.ith…...

tomcat部署zrlog
1.下载zrlog包,并添加到虚拟机中 1)进入/opt/apache-tomcat-8.5.90/webapps目录 cd /opt/apache-tomcat-8.5.90/webapps2)下载zrlog包 wget http://dl.zrlog.com/release/zrlog-1.7.1-baaecb9-release.war 3)重命名包 mv zrlog-1.7.1-baaecb9-release zrblog 2…...
Ubuntu Desktop 开机数字小键盘
Ubuntu Desktop 开机数字小键盘 1. 开机数字小键盘References 1. 开机数字小键盘 一般情况下,Ubuntu 开机后小键盘区是控制方向键而非数字键,每次开机后若用到数字键都需要按下 NumLock 键。 References [1] Yongqiang Cheng, https://yongqiang.blog…...

树莓派编程基础与硬件控制
1.编程语言 Python 是一种泛用型的编程语言,可以用于大量场景的程序开发中。根据基于谷歌搜 索指数的 PYPL(程序语言流行指数)统计,Python 是 2019 年 2 月全球范围内最为流行 的编程语言 相比传统的 C、Java 等编程语言&#x…...

autojs通过正则表达式获取带有数字的text内容
视频连接 视频连接 参考 参考 var ctextMatches(/\d/).findOne()console.log("当前金币"c.text()) // 获取当前金币UiSelector.textMatches(reg) reg {string} | {Regex} 要满足的正则表达式。 为当前选择器附加控件"text需要满足正则表达式reg"的条件。 …...
Android java基础_类的继承
一.Android Java基础_类的继承 先封装一个persion类,在persion的基础上定义Student类,并基础persion类。 子类能访问父类的成员函数。 class Person {private int age;public void setAge(int age) {if (age < 0 || age > 200)age 0;else {thi…...
nginx stream proxy 模块的ssl连接源码分析
目录 1. 源起2. 分析验证环境的配置3. 源码分析3.1 代理模块的请求入口点分析3.2 发起与上游服务器的连接3.3 连接回调3.4 TCP连接建立成功后为上下游数据透传做准备3.5 TCP连接的ssl上下文初始化3.6 ssl握手成功后的处理3.7 连接数据的收与发1. 源起 我一直来对ssl建立连接的过…...
C#面:Static Nested Class 和 Inner Class 有什么不同
这是两种不同的类嵌套方式。 Static Nested Class : 是一个静态嵌套类,它是在外部类中定义的一个静态类。它可以访问外部类的静态成员和方法,但不能直接访问外部类的非静态成员和方法。静态嵌套类可以独立于外部类实例化,即可以…...

LeetCode、208. 实现 Trie (前缀树)【中等,自定义数据结构】
文章目录 前言LeetCode、208. 实现 Trie (前缀树)【中等,自定义数据结构】题目链接与分类思路 资料获取 前言 博主介绍:✌目前全网粉丝2W,csdn博客专家、Java领域优质创作者,博客之星、阿里云平台优质作者、专注于Java后端技术领…...

java数据结构与算法刷题-----LeetCode151. 反转字符串中的单词
java数据结构与算法刷题目录(剑指Offer、LeetCode、ACM)-----主目录-----持续更新(进不去说明我没写完):https://blog.csdn.net/grd_java/article/details/123063846 解题思路 这道题,可以理解为,将字符串颠倒…...
uniapp 对接腾讯云IM群组成员管理(增删改查)
UniApp 实战:腾讯云IM群组成员管理(增删改查) 一、前言 在社交类App开发中,群组成员管理是核心功能之一。本文将基于UniApp框架,结合腾讯云IM SDK,详细讲解如何实现群组成员的增删改查全流程。 权限校验…...
基于数字孪生的水厂可视化平台建设:架构与实践
分享大纲: 1、数字孪生水厂可视化平台建设背景 2、数字孪生水厂可视化平台建设架构 3、数字孪生水厂可视化平台建设成效 近几年,数字孪生水厂的建设开展的如火如荼。作为提升水厂管理效率、优化资源的调度手段,基于数字孪生的水厂可视化平台的…...
C++八股 —— 单例模式
文章目录 1. 基本概念2. 设计要点3. 实现方式4. 详解懒汉模式 1. 基本概念 线程安全(Thread Safety) 线程安全是指在多线程环境下,某个函数、类或代码片段能够被多个线程同时调用时,仍能保证数据的一致性和逻辑的正确性…...

项目部署到Linux上时遇到的错误(Redis,MySQL,无法正确连接,地址占用问题)
Redis无法正确连接 在运行jar包时出现了这样的错误 查询得知问题核心在于Redis连接失败,具体原因是客户端发送了密码认证请求,但Redis服务器未设置密码 1.为Redis设置密码(匹配客户端配置) 步骤: 1).修…...
rnn判断string中第一次出现a的下标
# coding:utf8 import torch import torch.nn as nn import numpy as np import random import json""" 基于pytorch的网络编写 实现一个RNN网络完成多分类任务 判断字符 a 第一次出现在字符串中的位置 """class TorchModel(nn.Module):def __in…...
Java求职者面试指南:Spring、Spring Boot、MyBatis框架与计算机基础问题解析
Java求职者面试指南:Spring、Spring Boot、MyBatis框架与计算机基础问题解析 一、第一轮提问(基础概念问题) 1. 请解释Spring框架的核心容器是什么?它在Spring中起到什么作用? Spring框架的核心容器是IoC容器&#…...
Go语言多线程问题
打印零与奇偶数(leetcode 1116) 方法1:使用互斥锁和条件变量 package mainimport ("fmt""sync" )type ZeroEvenOdd struct {n intzeroMutex sync.MutexevenMutex sync.MutexoddMutex sync.Mutexcurrent int…...

uniapp 开发ios, xcode 提交app store connect 和 testflight内测
uniapp 中配置 配置manifest 文档:manifest.json 应用配置 | uni-app官网 hbuilderx中本地打包 下载IOS最新SDK 开发环境 | uni小程序SDK hbulderx 版本号:4.66 对应的sdk版本 4.66 两者必须一致 本地打包的资源导入到SDK 导入资源 | uni小程序SDK …...
libfmt: 现代C++的格式化工具库介绍与酷炫功能
libfmt: 现代C的格式化工具库介绍与酷炫功能 libfmt 是一个开源的C格式化库,提供了高效、安全的文本格式化功能,是C20中引入的std::format的基础实现。它比传统的printf和iostream更安全、更灵活、性能更好。 基本介绍 主要特点 类型安全:…...
es6+和css3新增的特性有哪些
一:ECMAScript 新特性(ES6) ES6 (2015) - 革命性更新 1,记住的方法,从一个方法里面用到了哪些技术 1,let /const块级作用域声明2,**默认参数**:函数参数可以设置默认值。3&#x…...