当前位置: 首页 > news >正文

DataX源码分析 writer

系列文章目录

一、DataX详解和架构介绍
二、DataX源码分析 JobContainer
三、DataX源码分析 TaskGroupContainer
四、DataX源码分析 TaskExecutor
五、DataX源码分析 reader
六、DataX源码分析 writer
七、DataX源码分析 Channel

文章目录

  • 系列文章目录
  • 前言
  • DataX的Writer写入流程
  • Writer组件如何处理各类数据源
  • writer相关源码


前言

在 DataX 中,writer 是数据同步过程中的一个核心组件,负责将数据写入到目标数据源。下面是对 DataX 中 writer 组件的源码分析:
Writer 接口定义:
DataX 的 writer 组件首先定义了一个 Writer 接口,该接口定义了 writer 需要实现的基本方法,如 init(), write(), post() 等。
不同的数据源插件需要实现这个接口,提供对应的数据写入逻辑。
Writer 插件实现:
对于每种目标数据源,DataX 都会有一个对应的 writer 插件实现。例如,对于 MySQL 数据源,会有一个 MysqlWriter 类实现 Writer 接口。
每个 writer 插件的实现中,会包含与目标数据源交互的逻辑,如建立连接、执行 SQL 语句、批量插入数据等。
Writer 配置:
在 DataX 的 JSON 配置文件中,会指定 writer 的类型和相应的配置参数。
这些配置参数会被传递给 writer 插件的 init() 方法,用于初始化 writer 实例。
数据写入逻辑:
在 write() 方法中,writer 会从上游的 reader 中获取数据,并将其写入到目标数据源。
根据不同的数据源和写入策略,writer 可能会采用批量插入、逐条插入等方式进行数据写入。
writer 还会处理写入过程中的异常和错误,确保数据的完整性和一致性。
Writer 清理和关闭:
在数据写入完成后,writer 会执行 post() 方法,进行一些清理和关闭操作。
这可能包括关闭数据库连接、释放资源等。
通过对 DataX 中 writer 组件的源码分析,我们可以了解到 writer 是如何与目标数据源进行交互的,以及它是如何处理和写入数据的。


DataX的Writer写入流程

  • 初始化和准备:
    根据配置文件中指定的目标数据源类型和参数,初始化Writer实例。
    建立与目标数据源的连接,这通常涉及到网络连接、认证授权等步骤。
    准备写入操作所需的各种资源,如缓冲区、事务等。
  • 数据接收:
    Writer从上游的Reader组件接收数据。这些数据可能是经过转换和处理的,已经符合目标数据源的要求。Writer将数据暂存到本地缓冲区或内存中,等待批量写入或逐条写入。
  • 数据格式化和处理:
    根据目标数据源的要求,Writer可能需要对接收到的数据进行格式化处理,如将数据转换为特定的文本格式、二进制格式或JSON格式等。
  • 数据写入:
    Writer将格式化处理后的数据写入目标数据源。写入操作可能涉及到网络通信、数据库操作等。根据目标数据源的特性,Writer会采用批量写入、流式写入等不同的写入方式以提高性能。对于支持事务的数据源,Writer会在每个写入操作前开启一个事务,并在写入完成后提交事务以确保数据的一致性。
  • 错误处理和重试:
    在写入过程中,Writer需要处理可能出现的各种错误和异常,如网络中断、数据格式错误等。根据配置文件中指定的错误处理策略,Writer可能会进行重试、跳过错误数据、记录错误日志等操作。
  • 写入完成和清理:
    当所有数据都成功写入目标数据源后,Writer会执行一些清理操作,如关闭数据库连接、释放资源等。Writer还会向上游的Reader或整个DataX任务发送完成信号,以通知整个任务流程已经完成。

Writer组件如何处理各类数据源

不同的数据源具有不同的写入特性和要求,因此Writer组件需要针对不同的数据源实现相应的写入逻辑。以下是一般情况下,DataX Writer组件如何处理各类数据源的大致步骤和考虑因素:

  • 数据源连接:
    Writer组件首先需要与目标数据源建立连接。这可能涉及到网络通信、认证授权、连接池管理等操作。
    根据数据源类型的不同,Writer可能会使用不同的连接协议和库,如JDBC、ODBC、API等。
  • 写入前准备:
    根据目标数据源的表结构,Writer可能需要创建表、索引或分区。
    Writer可能还需要准备写入数据的格式,如文本、二进制、JSON等。
    对于支持事务的数据源,Writer可能会开启一个事务来确保数据的一致性。
  • 数据写入:
    Writer从Reader组件接收数据,并将其写入目标数据源。
    根据数据源的特点,Writer可能会采用批量写入、逐条写入、流式写入等不同的写入方式。对于一些支持并行写入的数据源,Writer可能需要将数据分片并分配给多个线程或进程进行并发写入。
  • 错误处理:
    Writer需要处理写入过程中可能出现的异常和错误,如网络中断、数据格式错误、数据冲突等。
    根据不同的错误类型,Writer可能会采取重试、跳过、记录错误日志等不同的处理策略。
  • 写入优化:
    对于不同的数据源,Writer可能会采用不同的优化策略来提高写入性能,如使用批量插入、调整事务大小、优化网络传输等。
    Writer还可能利用目标数据源的特定功能,如批量提交、索引优化等,来进一步提高写入效率。
  • 写入后处理:
    在数据写入完成后,Writer可能会执行一些后处理操作,如提交事务、关闭连接、清理临时文件等。
    对于一些需要额外处理的数据源,Writer可能还会执行数据校验、更新统计信息等操作。
  • 扩展性和灵活性:
    DataX的Writer组件设计通常具有高度的扩展性和灵活性,以便支持新的数据源类型。通过实现统一的接口和抽象类,可以方便地添加新的Writer插件来支持新的数据源。

总之,DataX的Writer组件通过针对不同数据源实现特定的写入逻辑和优化策略,能够高效地处理各类数据源,并确保数据的正确性和一致性。同时,其扩展性和灵活性的设计也使得DataX能够轻松应对不断变化的数据处理需求。

writer相关源码


/*** 每个Writer插件需要实现Writer类,并在其内部实现Job、Task两个内部类。* * * */
public abstract class Writer extends BaseObject {/*** 每个Writer插件必须实现Job内部类*/public abstract static class Job extends AbstractJobPlugin {/*** 切分任务。<br>* * @param mandatoryNumber*            为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则框架报错!* * */public abstract List<Configuration> split(int mandatoryNumber);}/*** 每个Writer插件必须实现Task内部类*/public abstract static class Task extends AbstractTaskPlugin {public abstract void startWrite(RecordReceiver lineReceiver);public boolean supportFailOver(){return false;}}
}
public class MysqlWriter extends Writer {private static final DataBaseType DATABASE_TYPE = DataBaseType.MySql;public static class Job extends Writer.Job {private Configuration originalConfig = null;private CommonRdbmsWriter.Job commonRdbmsWriterJob;@Overridepublic void preCheck(){this.init();this.commonRdbmsWriterJob.writerPreCheck(this.originalConfig, DATABASE_TYPE);}@Overridepublic void init() {this.originalConfig = super.getPluginJobConf();this.commonRdbmsWriterJob = new CommonRdbmsWriter.Job(DATABASE_TYPE);this.commonRdbmsWriterJob.init(this.originalConfig);}// 一般来说,是需要推迟到 task 中进行pre 的执行(单表情况例外)@Overridepublic void prepare() {//实跑先不支持 权限 检验//this.commonRdbmsWriterJob.privilegeValid(this.originalConfig, DATABASE_TYPE);this.commonRdbmsWriterJob.prepare(this.originalConfig);}@Overridepublic List<Configuration> split(int mandatoryNumber) {return this.commonRdbmsWriterJob.split(this.originalConfig, mandatoryNumber);}// 一般来说,是需要推迟到 task 中进行post 的执行(单表情况例外)@Overridepublic void post() {this.commonRdbmsWriterJob.post(this.originalConfig);}@Overridepublic void destroy() {this.commonRdbmsWriterJob.destroy(this.originalConfig);}}public static class Task extends Writer.Task {private Configuration writerSliceConfig;private CommonRdbmsWriter.Task commonRdbmsWriterTask;@Overridepublic void init() {this.writerSliceConfig = super.getPluginJobConf();this.commonRdbmsWriterTask = new CommonRdbmsWriter.Task(DATABASE_TYPE);this.commonRdbmsWriterTask.init(this.writerSliceConfig);}@Overridepublic void prepare() {this.commonRdbmsWriterTask.prepare(this.writerSliceConfig);}//TODO 改用连接池,确保每次获取的连接都是可用的(注意:连接可能需要每次都初始化其 session)public void startWrite(RecordReceiver recordReceiver) {this.commonRdbmsWriterTask.startWrite(recordReceiver, this.writerSliceConfig,super.getTaskPluginCollector());}@Overridepublic void post() {this.commonRdbmsWriterTask.post(this.writerSliceConfig);}@Overridepublic void destroy() {this.commonRdbmsWriterTask.destroy(this.writerSliceConfig);}@Overridepublic boolean supportFailOver(){String writeMode = writerSliceConfig.getString(Key.WRITE_MODE);return "replace".equalsIgnoreCase(writeMode);}}}

public class RdbmsWriter extends Writer {private static final DataBaseType DATABASE_TYPE = DataBaseType.RDBMS;static {//加载插件下面配置的驱动类DBUtil.loadDriverClass("writer", "rdbms");}public static class Job extends Writer.Job {private Configuration originalConfig = null;private CommonRdbmsWriter.Job commonRdbmsWriterMaster;@Overridepublic void init() {this.originalConfig = super.getPluginJobConf();// warn:not like mysql, only support insert mode, don't useString writeMode = this.originalConfig.getString(Key.WRITE_MODE);if (null != writeMode) {throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,String.format("写入模式(writeMode)配置有误. 因为不支持配置参数项 writeMode: %s, 仅使用insert sql 插入数据. 请检查您的配置并作出修改.",writeMode));}this.commonRdbmsWriterMaster = new SubCommonRdbmsWriter.Job(DATABASE_TYPE);this.commonRdbmsWriterMaster.init(this.originalConfig);}@Overridepublic void prepare() {this.commonRdbmsWriterMaster.prepare(this.originalConfig);}@Overridepublic List<Configuration> split(int mandatoryNumber) {return this.commonRdbmsWriterMaster.split(this.originalConfig,mandatoryNumber);}@Overridepublic void post() {this.commonRdbmsWriterMaster.post(this.originalConfig);}@Overridepublic void destroy() {this.commonRdbmsWriterMaster.destroy(this.originalConfig);}}public static class Task extends Writer.Task {private Configuration writerSliceConfig;private CommonRdbmsWriter.Task commonRdbmsWriterSlave;@Overridepublic void init() {this.writerSliceConfig = super.getPluginJobConf();this.commonRdbmsWriterSlave = new SubCommonRdbmsWriter.Task(DATABASE_TYPE);this.commonRdbmsWriterSlave.init(this.writerSliceConfig);}@Overridepublic void prepare() {this.commonRdbmsWriterSlave.prepare(this.writerSliceConfig);}public void startWrite(RecordReceiver recordReceiver) {this.commonRdbmsWriterSlave.startWrite(recordReceiver,this.writerSliceConfig, super.getTaskPluginCollector());}@Overridepublic void post() {this.commonRdbmsWriterSlave.post(this.writerSliceConfig);}@Overridepublic void destroy() {this.commonRdbmsWriterSlave.destroy(this.writerSliceConfig);}}}

相关文章:

DataX源码分析 writer

系列文章目录 一、DataX详解和架构介绍 二、DataX源码分析 JobContainer 三、DataX源码分析 TaskGroupContainer 四、DataX源码分析 TaskExecutor 五、DataX源码分析 reader 六、DataX源码分析 writer 七、DataX源码分析 Channel 文章目录 系列文章目录前言DataX的Writer写入流…...

为自己的项目媒体资源添加固定高度

为自己的项目媒体资源添加固定高度 未媒体资源添加固定高度&#xff0c;不仅有利于确定懒加载后的切确位置&#xff0c;还可以做骨架屏、loading动画等等&#xff0c;但是因为历史数据中很多没有加高度的媒体资源&#xff0c;所以一直嫌麻烦没有做。 直到这个季度有一个自上而…...

家政小程序系统源码开发:引领智能生活新篇章

随着科技的飞速发展&#xff0c;小程序作为一种便捷的应用形态&#xff0c;已经深入到我们生活的方方面面。尤其在家庭服务领域&#xff0c;家政小程序的出现为人们带来了前所未有的便利。它不仅简化了家政服务的流程&#xff0c;提升了服务质量&#xff0c;还为家政服务行业注…...

多表查询

目录 统计出一张数据表中的数据量 查询 dept 表中的数据量 查询 emp 表中的数据量 实现 emp 与 dept 的多表查询 笛卡尔积 消除笛卡尔积 把数据表 emp 的别名定为 e&#xff0c;数据表 dept 的别名定为 d&#xff0c;然后在查询中分别使用 e 和 d 代替这两个表 Oracle从…...

PHP开发日志 ━━ 深入理解三元操作与一般条件语句的不同

概况 三元运算符的功能与“if…else”流程语句一致。 在一般情况下&#xff0c;三元操作替换if条件语句可以精简代码&#xff0c;并且更为直观&#xff0c;但是在下面的情况中使用三元操作将会返回警告。 借图&#xff1a; 案例 比如原代码&#xff1a; class classA{publ…...

多维时序 | Matlab实现RF-Adaboost随机森林结合Adaboost多变量时间序列预测

多维时序 | Matlab实现RF-Adaboost随机森林结合Adaboost多变量时间序列预测 目录 多维时序 | Matlab实现RF-Adaboost随机森林结合Adaboost多变量时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 1.Matlab实现RF-Adaboost随机森林结合Adaboost多变量时间序列预…...

vue3-内置组件-Suspense

Suspense (实验性功能) <Suspense> 是一项实验性功能。它不一定会最终成为稳定功能&#xff0c;并且在稳定之前相关 API 也可能会发生变化。 <Suspense> 是一个内置组件&#xff0c;用来在组件树中协调对异步依赖的处理。它让我们可以在组件树上层等待下层的多个嵌…...

Rust入门:如何在windows + vscode中关闭程序codelldb.exe

在windows中用vscode单步调试rust程序的时候&#xff0c;发现无论是按下stop键&#xff0c;还是运行完程序&#xff0c;调试器codelldb.exe一直霸占着主程序不退出&#xff0c;如果此时对代码进行修改&#xff0c;后续就没法再编译调试了。 目前我也不知道要怎么处理这个事&am…...

git错误整理

remote: Support for password authentication was removed on August 13, 2021. 参考&#xff1a;这篇即可 GnuTLS recv error (-110): The TLS connection was non-properly terminated. 执行下面的指令&#xff1a; git config --global http.sslVerify false...

跟着cherno手搓游戏引擎【22】CameraController、Resize

前置&#xff1a; YOTO.h: #pragma once//用于YOTO APP#include "YOTO/Application.h" #include"YOTO/Layer.h" #include "YOTO/Log.h"#include"YOTO/Core/Timestep.h"#include"YOTO/Input.h" #include"YOTO/KeyCod…...

微信小程序(四十二)wechat-http拦截器

注释很详细&#xff0c;直接上代码 上一篇 新增内容&#xff1a; 1.wechat-http请求的封装 2.wechat-http请求的拦截器的用法演示 源码&#xff1a; utils/http.js import http from "wechat-http"//设置全局默认请求地址 http.baseURL "https://live-api.ith…...

tomcat部署zrlog

1.下载zrlog包&#xff0c;并添加到虚拟机中 1)进入/opt/apache-tomcat-8.5.90/webapps目录 cd /opt/apache-tomcat-8.5.90/webapps2)下载zrlog包 wget http://dl.zrlog.com/release/zrlog-1.7.1-baaecb9-release.war 3)重命名包 mv zrlog-1.7.1-baaecb9-release zrblog 2…...

Ubuntu Desktop 开机数字小键盘

Ubuntu Desktop 开机数字小键盘 1. 开机数字小键盘References 1. 开机数字小键盘 一般情况下&#xff0c;Ubuntu 开机后小键盘区是控制方向键而非数字键&#xff0c;每次开机后若用到数字键都需要按下 NumLock 键。 References [1] Yongqiang Cheng, https://yongqiang.blog…...

树莓派编程基础与硬件控制

1.编程语言 Python 是一种泛用型的编程语言&#xff0c;可以用于大量场景的程序开发中。根据基于谷歌搜 索指数的 PYPL&#xff08;程序语言流行指数&#xff09;统计&#xff0c;Python 是 2019 年 2 月全球范围内最为流行 的编程语言 相比传统的 C、Java 等编程语言&#x…...

autojs通过正则表达式获取带有数字的text内容

视频连接 视频连接 参考 参考 var ctextMatches(/\d/).findOne()console.log("当前金币"c.text()) // 获取当前金币UiSelector.textMatches(reg) reg {string} | {Regex} 要满足的正则表达式。 为当前选择器附加控件"text需要满足正则表达式reg"的条件。 …...

Android java基础_类的继承

一.Android Java基础_类的继承 先封装一个persion类&#xff0c;在persion的基础上定义Student类&#xff0c;并基础persion类。 子类能访问父类的成员函数。 class Person {private int age;public void setAge(int age) {if (age < 0 || age > 200)age 0;else {thi…...

nginx stream proxy 模块的ssl连接源码分析

目录 1. 源起2. 分析验证环境的配置3. 源码分析3.1 代理模块的请求入口点分析3.2 发起与上游服务器的连接3.3 连接回调3.4 TCP连接建立成功后为上下游数据透传做准备3.5 TCP连接的ssl上下文初始化3.6 ssl握手成功后的处理3.7 连接数据的收与发1. 源起 我一直来对ssl建立连接的过…...

C#面:Static Nested Class 和 Inner Class 有什么不同

这是两种不同的类嵌套方式。 Static Nested Class &#xff1a; 是一个静态嵌套类&#xff0c;它是在外部类中定义的一个静态类。它可以访问外部类的静态成员和方法&#xff0c;但不能直接访问外部类的非静态成员和方法。静态嵌套类可以独立于外部类实例化&#xff0c;即可以…...

LeetCode、208. 实现 Trie (前缀树)【中等,自定义数据结构】

文章目录 前言LeetCode、208. 实现 Trie (前缀树)【中等&#xff0c;自定义数据结构】题目链接与分类思路 资料获取 前言 博主介绍&#xff1a;✌目前全网粉丝2W&#xff0c;csdn博客专家、Java领域优质创作者&#xff0c;博客之星、阿里云平台优质作者、专注于Java后端技术领…...

java数据结构与算法刷题-----LeetCode151. 反转字符串中的单词

java数据结构与算法刷题目录&#xff08;剑指Offer、LeetCode、ACM&#xff09;-----主目录-----持续更新(进不去说明我没写完)&#xff1a;https://blog.csdn.net/grd_java/article/details/123063846 解题思路 这道题&#xff0c;可以理解为&#xff0c;将字符串颠倒&#xf…...

uniapp 对接腾讯云IM群组成员管理(增删改查)

UniApp 实战&#xff1a;腾讯云IM群组成员管理&#xff08;增删改查&#xff09; 一、前言 在社交类App开发中&#xff0c;群组成员管理是核心功能之一。本文将基于UniApp框架&#xff0c;结合腾讯云IM SDK&#xff0c;详细讲解如何实现群组成员的增删改查全流程。 权限校验…...

基于数字孪生的水厂可视化平台建设:架构与实践

分享大纲&#xff1a; 1、数字孪生水厂可视化平台建设背景 2、数字孪生水厂可视化平台建设架构 3、数字孪生水厂可视化平台建设成效 近几年&#xff0c;数字孪生水厂的建设开展的如火如荼。作为提升水厂管理效率、优化资源的调度手段&#xff0c;基于数字孪生的水厂可视化平台的…...

C++八股 —— 单例模式

文章目录 1. 基本概念2. 设计要点3. 实现方式4. 详解懒汉模式 1. 基本概念 线程安全&#xff08;Thread Safety&#xff09; 线程安全是指在多线程环境下&#xff0c;某个函数、类或代码片段能够被多个线程同时调用时&#xff0c;仍能保证数据的一致性和逻辑的正确性&#xf…...

项目部署到Linux上时遇到的错误(Redis,MySQL,无法正确连接,地址占用问题)

Redis无法正确连接 在运行jar包时出现了这样的错误 查询得知问题核心在于Redis连接失败&#xff0c;具体原因是客户端发送了密码认证请求&#xff0c;但Redis服务器未设置密码 1.为Redis设置密码&#xff08;匹配客户端配置&#xff09; 步骤&#xff1a; 1&#xff09;.修…...

rnn判断string中第一次出现a的下标

# coding:utf8 import torch import torch.nn as nn import numpy as np import random import json""" 基于pytorch的网络编写 实现一个RNN网络完成多分类任务 判断字符 a 第一次出现在字符串中的位置 """class TorchModel(nn.Module):def __in…...

Java求职者面试指南:Spring、Spring Boot、MyBatis框架与计算机基础问题解析

Java求职者面试指南&#xff1a;Spring、Spring Boot、MyBatis框架与计算机基础问题解析 一、第一轮提问&#xff08;基础概念问题&#xff09; 1. 请解释Spring框架的核心容器是什么&#xff1f;它在Spring中起到什么作用&#xff1f; Spring框架的核心容器是IoC容器&#…...

Go语言多线程问题

打印零与奇偶数&#xff08;leetcode 1116&#xff09; 方法1&#xff1a;使用互斥锁和条件变量 package mainimport ("fmt""sync" )type ZeroEvenOdd struct {n intzeroMutex sync.MutexevenMutex sync.MutexoddMutex sync.Mutexcurrent int…...

uniapp 开发ios, xcode 提交app store connect 和 testflight内测

uniapp 中配置 配置manifest 文档&#xff1a;manifest.json 应用配置 | uni-app官网 hbuilderx中本地打包 下载IOS最新SDK 开发环境 | uni小程序SDK hbulderx 版本号&#xff1a;4.66 对应的sdk版本 4.66 两者必须一致 本地打包的资源导入到SDK 导入资源 | uni小程序SDK …...

libfmt: 现代C++的格式化工具库介绍与酷炫功能

libfmt: 现代C的格式化工具库介绍与酷炫功能 libfmt 是一个开源的C格式化库&#xff0c;提供了高效、安全的文本格式化功能&#xff0c;是C20中引入的std::format的基础实现。它比传统的printf和iostream更安全、更灵活、性能更好。 基本介绍 主要特点 类型安全&#xff1a…...

es6+和css3新增的特性有哪些

一&#xff1a;ECMAScript 新特性&#xff08;ES6&#xff09; ES6 (2015) - 革命性更新 1&#xff0c;记住的方法&#xff0c;从一个方法里面用到了哪些技术 1&#xff0c;let /const块级作用域声明2&#xff0c;**默认参数**&#xff1a;函数参数可以设置默认值。3&#x…...