DataX源码分析-插件机制
系列文章目录
一、DataX详解和架构介绍
二、DataX源码分析 JobContainer
三、DataX源码分析 TaskGroupContainer
四、DataX源码分析 TaskExecutor
五、DataX源码分析 reader
六、DataX源码分析 writer
七、DataX源码分析 Channel
八、DataX源码分析-插件机制
文章目录
- 系列文章目录
- 前言
- 一、插件分类
- 插件目录结构
- 插件加载原理
前言
DataX的插件机制是其核心特性之一,它使得DataX能够灵活地适应各种不同的数据源的数据同步。这一机制主要基于插件开发框架,该框架主要包括Reader插件、Transformer插件、Writer插件。
DataX的插件机制还采用了框架+插件的架构。框架负责连接Reader和Writer插件,作为两者的数据传输通道,并处理缓冲、流控、并发、数据转换等核心技术问题。这种架构使得插件只需关心数据的读取或写入本身,而同步的共性问题则由框架来处理。
此外,DataX的插件机制还具有良好的扩展性和可维护性。开发者可以根据需要开发新的Reader或Writer插件来支持新的数据源类型,而无需修改DataX的核心框架代码。这种插件化的设计使得DataX能够适应不断变化的业务需求和技术环境。
在插件的加载和初始化方面,DataX使用了类似Java SPI(Service Provider Interface)的机制。它会在指定的插件目录中查找并加载插件,然后将其注册到插件注册中心。这样,当需要使用某个插件时,就可以从注册中心中获取其实例,并进行相应的操作。
总的来说,DataX的插件机制是一种非常灵活和可扩展的设计,它使得DataX能够适应各种不同的数据源和数据存储需求,同时也为开发者提供了丰富的扩展和定制化的可能性。
一、插件分类
按照功能分:
reader, 读插件,例如mysqlReader,从mysql读取数据
writer, 写插件。例如mysqlWriter,给mysql写入数据;
transformer, 中间结果转换,例如SubstrTransformer用于字符截取;
按照运行类型分:
Job级别的插件
Task级别的插件
插件目录结构
datax\plugin下分2个reader和writer目录,下面以mysql为例

plugin.json内容:
{"name": "mysqlreader","class": "xxx.plugin.reader.mysqlreader.MysqlReader","description": "useScene: prod. mechanism: Jdbc connection using the database, execute select sql, retrieve data from the ResultSet. warn: The more you know about the database, the less problems you encounter.","developer": "xx"
}

插件加载原理
- DataX进程启动入口为com.alibaba.datax.core.Engineengine.entry()
public static void entry(final String[] args) throws Throwable {Options options = new Options();options.addOption("job", true, "Job config.");options.addOption("jobid", true, "Job unique id.");options.addOption("mode", true, "Job runtime mode.");BasicParser parser = new BasicParser();CommandLine cl = parser.parse(options, args);String jobPath = cl.getOptionValue("job");// 如果用户没有明确指定jobid, 则 datax.py 会指定 jobid 默认值为-1String jobIdString = cl.getOptionValue("jobid");RUNTIME_MODE = cl.getOptionValue("mode");Configuration configuration = ConfigParser.parse(jobPath);}
- 读取并解析插件配置
ConfigParser.parse(final String jobPath)传入job路径,该方法组装解析,最后返回一个Configuration对象,Configuration里解析出了reader,writer,handler等插件名称;提取完插件名称后,会去reader目录和writer目录,寻找插件的位置。 - 动态加载插件
插件的加载都是通过自定义类加载器JarLoader动态加载,提供插件相关Jar隔离的加载机制。插件的加载接口由LoadUtil类负责,当要加载一个插件时,需要实例化一个JarLoader,然后切换thread class loader之后,才加载插件。这个主要由ClassLoaderSwapper实现。 - JarLoader类
JarLoader 负责加载指定路径下的插件 JAR 文件。它会检查 JAR 文件的合法性、有效性以及是否包含必要的插件实现类。继承自URLClassLoader提供Jar隔离的加载机制,会把传入的路径、及其子路径、以及路径中的jar文件加入到class path。
/*** 提供Jar隔离的加载机制,会把传入的路径、及其子路径、以及路径中的jar文件加入到class path。*/
public class JarLoader extends URLClassLoader{public JarLoader(String[] paths) {this(paths, JarLoader.class.getClassLoader());}public JarLoader(String[] paths, ClassLoader parent) {super(getURLs(paths), parent);}private static URL[] getURLs(String[] paths) {Validate.isTrue(null != paths && 0 != paths.length,"jar包路径不能为空.");List<String> dirs = new ArrayList<String>();for (String path : paths) {dirs.add(path);JarLoader.collectDirs(path, dirs);}List<URL> urls = new ArrayList<URL>();for (String path : dirs) {urls.addAll(doGetURLs(path));}return urls.toArray(new URL[0]);}private static void collectDirs(String path, List<String> collector) {if (null == path || StringUtils.isBlank(path)) {return;}File current = new File(path);if (!current.exists() || !current.isDirectory()) {return;}for (File child : current.listFiles()) {if (!child.isDirectory()) {continue;}collector.add(child.getAbsolutePath());collectDirs(child.getAbsolutePath(), collector);}}private static List<URL> doGetURLs(final String path) {Validate.isTrue(!StringUtils.isBlank(path), "jar包路径不能为空.");File jarPath = new File(path);Validate.isTrue(jarPath.exists() && jarPath.isDirectory(),"jar包路径必须存在且为目录.");/* set filter */FileFilter jarFilter = new FileFilter() {@Overridepublic boolean accept(File pathname) {return pathname.getName().endsWith(".jar");}};/* iterate all jar */File[] allJars = new File(path).listFiles(jarFilter);List<URL> jarURLs = new ArrayList<URL>(allJars.length);for (int i = 0; i < allJars.length; i++) {try {jarURLs.add(allJars[i].toURI().toURL());} catch (Exception e) {throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_INIT_ERROR,"系统加载jar包出错", e);}}return jarURLs;}
}
- LoadUtil
LoadUtil 是一个工具类,用于辅助插件的加载和初始化过程。LoadUtil 类通常包含静态方法,这些方法简化了插件加载的逻辑,使得 DataX 的核心框架能够与具体的插件进行交互。
LoadUtil 的主要职责包括:
插件加载:LoadUtil 提供了加载插件的方法。这些方法会根据配置文件中指定的插件类型和名称,使用 Java 的反射机制来加载插件的类定义。加载过程可能包括查找类路径下的 JAR 文件、读取插件的元数据以及验证插件的合法性。
插件实例化:一旦插件类被加载,LoadUtil 会负责创建插件的实例。这通常涉及到调用插件类的无参构造函数,并返回该实例的引用。LoadUtil 会处理任何与实例化相关的异常,以确保在出现问题时能够给出适当的错误消息。
插件注册:加载并实例化插件后,LoadUtil 可能会将插件实例注册到一个全局的插件注册中心。这样,DataX 的其他部分就可以在需要时获取并使用这些插件实例。
配置传递:LoadUtil 还可能负责将配置文件中针对插件的配置参数传递给插件实例。这确保了插件能够根据用户的配置进行正确的初始化。
错误处理:如果在加载、实例化或配置插件过程中发生错误,LoadUtil 会负责处理这些错误。这可能包括记录日志、抛出异常或采取其他恢复措施。
public class LoadUtil {private static final String pluginTypeNameFormat = "plugin.%s.%s";private LoadUtil() {}private enum ContainerType {Job("Job"), Task("Task");private String type;private ContainerType(String type) {this.type = type;}public String value() {return type;}}/*** 所有插件配置放置在pluginRegisterCenter中,为区别reader、transformer和writer,还能区别* 具体pluginName,故使用pluginType.pluginName作为key放置在该map中*/private static Configuration pluginRegisterCenter;/*** jarLoader的缓冲*/private static Map<String, JarLoader> jarLoaderCenter = new HashMap();/*** 设置pluginConfigs,方便后面插件来获取** @param pluginConfigs*/public static void bind(Configuration pluginConfigs) {pluginRegisterCenter = pluginConfigs;}private static String generatePluginKey(PluginType pluginType,String pluginName) {return String.format(pluginTypeNameFormat, pluginType.toString(),pluginName);}private static Configuration getPluginConf(PluginType pluginType,String pluginName) {Configuration pluginConf = pluginRegisterCenter.getConfiguration(generatePluginKey(pluginType, pluginName));if (null == pluginConf) {throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_INSTALL_ERROR,String.format("DataX不能找到插件[%s]的配置.",pluginName));}return pluginConf;}/*** 加载JobPlugin,reader、writer都可能要加载** @param pluginType* @param pluginName* @return*/public static AbstractJobPlugin loadJobPlugin(PluginType pluginType,String pluginName) {Class<? extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(pluginType, pluginName, ContainerType.Job);try {AbstractJobPlugin jobPlugin = (AbstractJobPlugin) clazz.newInstance();jobPlugin.setPluginConf(getPluginConf(pluginType, pluginName));return jobPlugin;} catch (Exception e) {throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,String.format("DataX找到plugin[%s]的Job配置.",pluginName), e);}}/*** 加载taskPlugin,reader、writer都可能加载** @param pluginType* @param pluginName* @return*/public static AbstractTaskPlugin loadTaskPlugin(PluginType pluginType,String pluginName) {Class<? extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(pluginType, pluginName, ContainerType.Task);try {AbstractTaskPlugin taskPlugin = (AbstractTaskPlugin) clazz.newInstance();taskPlugin.setPluginConf(getPluginConf(pluginType, pluginName));return taskPlugin;} catch (Exception e) {throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,String.format("DataX不能找plugin[%s]的Task配置.",pluginName), e);}}/*** 根据插件类型、名字和执行时taskGroupId加载对应运行器** @param pluginType* @param pluginName* @return*/public static AbstractRunner loadPluginRunner(PluginType pluginType, String pluginName) {AbstractTaskPlugin taskPlugin = LoadUtil.loadTaskPlugin(pluginType,pluginName);switch (pluginType) {case READER:return new ReaderRunner(taskPlugin);case WRITER:return new WriterRunner(taskPlugin);default:throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,String.format("插件[%s]的类型必须是[reader]或[writer]!",pluginName));}}/*** 反射出具体plugin实例** @param pluginType* @param pluginName* @param pluginRunType* @return*/@SuppressWarnings("unchecked")private static synchronized Class<? extends AbstractPlugin> loadPluginClass(PluginType pluginType, String pluginName,ContainerType pluginRunType) {Configuration pluginConf = getPluginConf(pluginType, pluginName);JarLoader jarLoader = LoadUtil.getJarLoader(pluginType, pluginName);try {return (Class<? extends AbstractPlugin>) jarLoader.loadClass(pluginConf.getString("class") + "$"+ pluginRunType.value());} catch (Exception e) {throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);}}public static synchronized JarLoader getJarLoader(PluginType pluginType,String pluginName) {Configuration pluginConf = getPluginConf(pluginType, pluginName);JarLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType,pluginName));if (null == jarLoader) {String pluginPath = pluginConf.getString("path");if (StringUtils.isBlank(pluginPath)) {throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,String.format("%s插件[%s]路径非法!",pluginType, pluginName));}jarLoader = new JarLoader(new String[]{pluginPath});jarLoaderCenter.put(generatePluginKey(pluginType, pluginName),jarLoader);}return jarLoader;}
}
- ClassLoaderSwapper
ClassLoaderSwapper有一个属性storeClassLoader, 用于保存着当前线程的classLoader切换之前的ClassLoader。
/*** 为避免jar冲突,比如hbase可能有多个版本的读写依赖jar包,JobContainer和TaskGroupContainer,就需要脱离当前classLoader去加载这些jar包,执行完成后,又退回到原来classLoader上继续执行接下来的代码*/
public final class ClassLoaderSwapper {private ClassLoader storeClassLoader = null;private ClassLoaderSwapper() {}public static ClassLoaderSwapper newCurrentThreadClassLoaderSwapper() {return new ClassLoaderSwapper();}/*** 保存当前classLoader,并将当前线程的classLoader设置为所给classLoader** @param* @return*/public ClassLoader setCurrentThreadClassLoader(ClassLoader classLoader) {this.storeClassLoader = Thread.currentThread().getContextClassLoader();Thread.currentThread().setContextClassLoader(classLoader);return this.storeClassLoader;}/*** 将当前线程的类加载器设置为保存的类加载* @return*/public ClassLoader restoreCurrentThreadClassLoader() {ClassLoader classLoader = Thread.currentThread().getContextClassLoader();Thread.currentThread().setContextClassLoader(this.storeClassLoader);return classLoader;}
}
相关文章:
DataX源码分析-插件机制
系列文章目录 一、DataX详解和架构介绍 二、DataX源码分析 JobContainer 三、DataX源码分析 TaskGroupContainer 四、DataX源码分析 TaskExecutor 五、DataX源码分析 reader 六、DataX源码分析 writer 七、DataX源码分析 Channel 八、DataX源码分析-插件机制 文章目录 系列文章…...
容器高级知识: 适配器模式与 Sidecar 模式的区别
适配器模式与 Sidecar 模式的区别 在 Kubernetes 中,适配器模式和 Sidecar 模式都是扩展您的主应用程序容器功能的方法,但它们具有不同的目的和功能: Sidecar 模式: 通用目的: 为主应用程序提供 补充功能࿰…...
使用Xdisplay将ipad作为扩展显示器Agent闪退问题
1. 正常流程贴子挺多,可以参考这几篇 https://blog.csdn.net/Shi_Xueqing/article/details/129744496 如何将ipad作为win10的扩展屏(使用USB线连接)_ipad win usb 上网-CSDN博客 2.在进行pc端软件设置的时候发生闪退 解决方法:…...
DVXplorer事件相机入门
DV官方文档:Get Started DV (inivation.gitlab.io) DV事件相机的ROS包:https://github.com/uzh-rpg/rpg_dvs_ros 事件相机的资源汇总:https://github.com/uzh-rpg/event-based_vision_resources 1.DV事件相机ROS包驱动程序安装 注意&#x…...
ubuntu屏幕小的解决办法
1. 安装vmware tools , 再点自适应客户机 执行里面的vmware-install.pl这个文件 :sudo ./vmware-install.pl 执行不了可以放到家目录,我放在了/home/book 里面 最后点这个自适应客户机 然后我这里点不了是因为我点了控制台视图和拉伸客户机,…...
黑群晖一键修复:root、AME、DTS、转码、CPU型号等
食用方法:SSH连接群晖使用临时root权限执行 AME3.x激活补丁 只适用于x86_64的:DSM7.x Advanced Media Extensions (AME)版本3.0.1-2004、3.1.0-3005 激活过程需要下载官方的解码包,过程较慢,耐心等待。。。 DSM7.1和7.2的AME版…...
Repo命令使用实例(三十八)
简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 优质专栏:多媒…...
2024年华为OD机试真题-分披萨-Python-OD统一考试(C卷)
题目描述: “吃货”和“馋嘴”两人到披萨店点了一份铁盘(圆形)披萨,并嘱咐店员将披萨按放射状切成大小相同的偶数扇形小块。但是粗心服务员将披萨切成了每块大小都完全不同奇数块,且肉眼能分辨出大小。 由于两人都想吃到最多的披萨,他们商量了一个他们认为公平的分法:从…...
找负环(图论基础)
文章目录 负环spfa找负环方法一方法二实际效果 负环 环内路径上的权值和为负。 spfa找负环 两种基本的方法 统计每一个点的入队次数,如果一个点入队了n次,则说明存在负环统计当前每个点中的最短路中所包含的边数,如果当前某个点的最短路所…...
无人机飞控算法原理基础研究,多旋翼无人机的飞行控制算法理论详解,无人机飞控软件架构设计
多旋翼无人机的飞行控制算法主要涉及到自动控制器、捷联式惯性导航系统、卡尔曼滤波算法和飞行控制PID算法等部分。 自动控制器是无人机飞行控制的核心部分,它负责接收来自无人机传感器和其他系统的信息,并根据预设的算法和逻辑,对无人机的姿…...
关于内存相关的梳理
1 关键字 总结 (lowmemory,anr in) 2 知识储备 虚拟机原理 垃圾回收算法 又包含标记 和清除两种算法 标记:程序计数器-已过时,可达性分析 具体可见 http://help.eclipse.org/luna/index.jsp?topic%2Forg.ec…...
7.JS里表达式,if条件判断,三元运算符,switch语句,断点调试
表达式和语句的区别 表达式就是可以被求值的代码比如什么a 1 语句就是一段可以执行的代码比如什么if else 直接给B站的黑马程序员的老师引流一波总结的真好 分支语句 就是基本上所有的语言都会有的if else 语句就是满足不同的条件执行不同的代码,让计算机有条件…...
RK3568平台开发系列讲解(存储篇)文件句柄与文件描述符介绍
🚀返回专栏总目录 文章目录 一、什么是文件句柄二、什么是文件描述符2.1、files_struct 结构体2.2、fdtable 结构体三、数据结构关系图沉淀、分享、成长,让自己和他人都能有所收获!😄 一、什么是文件句柄 用户空间的进程通过open系统调用打开一个文件之后,内核返回的就是…...
【C++】类和对象(五)友元、内部类、匿名对象
前言:前面我们说到类和对象是一个十分漫长的荆棘地,今天我们将走到终点,也就是说我们对于C算是正式的入门了。 💖 博主CSDN主页:卫卫卫的个人主页 💞 👉 专栏分类:高质量C学习 &…...
攻防世界 CTF Web方向 引导模式-难度1 —— 1-10题 wp精讲
目录 view_source robots backup cookie disabled_button get_post weak_auth simple_php Training-WWW-Robots view_source 题目描述: X老师让小宁同学查看一个网页的源代码,但小宁同学发现鼠标右键好像不管用了。 不能按右键,按F12 robots …...
Docker之MongoDB安装、创建用户及登录认证
Docker之MongoDB安装、创建用户及登录认证 文章目录 Docker之MongoDB安装、创建用户及登录认证1. 拉取镜像2. 创建宿主机容器数据卷3. 运行mongodb容器1. 运行容器2. 创建用户3. 创建数据库并设置密码 1. 拉取镜像 docker pull mongo:4.2.212. 创建宿主机容器数据卷 运行docke…...
紫微斗数双星组合:天机天梁在辰戌
文章目录 前言内容总结 前言 紫微斗数双星组合:天机天梁在辰戌 内容 紫微斗数双星组合:天机天梁在辰戌 性格分析 在紫微斗数命盘中,天梁星是一颗“荫星”,能够遇难呈祥,化解凶危,主寿,主贵。…...
N-144基于微信小程序在线订餐系统
开发工具:IDEA、微信小程序 服务器:Tomcat9.0, jdk1.8 项目构建:maven 数据库:mysql5.7 前端技术:vue、ElementUI、 Vant Weapp 服务端技术:springbootmybatisredis 本系统分微信小程序和…...
[UI5 常用控件] 09.IconTabBar,IconTabHeader,TabContainer
文章目录 前言1. IconTabBar1.1 简介1.2 基本结构1.3 用法1.3.1 颜色,拖放,溢出1.3.2 Icons Only , Inner Contents1.3.3 showAll,Count,key,IconTabSeparator 1.3.4 Only Text1.3.5 headerMode-Inline1.3.6 design,IconTabSeparator-icon1.3.7 DensityM…...
CCF编程能力等级认证GESP—C++5级—20231209
CCF编程能力等级认证GESP—C5级—20231209 单选题(每题 2 分,共 30 分)判断题(每题 2 分,共 20 分)编程题 (每题 25 分,共 50 分)小杨的幸运数烹饪问题 答案及解析单选题判断题编程题1编程题2 单…...
51c自动驾驶~合集58
我自己的原文哦~ https://blog.51cto.com/whaosoft/13967107 #CCA-Attention 全局池化局部保留,CCA-Attention为LLM长文本建模带来突破性进展 琶洲实验室、华南理工大学联合推出关键上下文感知注意力机制(CCA-Attention),…...
MODBUS TCP转CANopen 技术赋能高效协同作业
在现代工业自动化领域,MODBUS TCP和CANopen两种通讯协议因其稳定性和高效性被广泛应用于各种设备和系统中。而随着科技的不断进步,这两种通讯协议也正在被逐步融合,形成了一种新型的通讯方式——开疆智能MODBUS TCP转CANopen网关KJ-TCPC-CANP…...
土地利用/土地覆盖遥感解译与基于CLUE模型未来变化情景预测;从基础到高级,涵盖ArcGIS数据处理、ENVI遥感解译与CLUE模型情景模拟等
🔍 土地利用/土地覆盖数据是生态、环境和气象等诸多领域模型的关键输入参数。通过遥感影像解译技术,可以精准获取历史或当前任何一个区域的土地利用/土地覆盖情况。这些数据不仅能够用于评估区域生态环境的变化趋势,还能有效评价重大生态工程…...
佰力博科技与您探讨热释电测量的几种方法
热释电的测量主要涉及热释电系数的测定,这是表征热释电材料性能的重要参数。热释电系数的测量方法主要包括静态法、动态法和积分电荷法。其中,积分电荷法最为常用,其原理是通过测量在电容器上积累的热释电电荷,从而确定热释电系数…...
SiFli 52把Imagie图片,Font字体资源放在指定位置,编译成指定img.bin和font.bin的问题
分区配置 (ptab.json) img 属性介绍: img 属性指定分区存放的 image 名称,指定的 image 名称必须是当前工程生成的 binary 。 如果 binary 有多个文件,则以 proj_name:binary_name 格式指定文件名, proj_name 为工程 名&…...
浪潮交换机配置track检测实现高速公路收费网络主备切换NQA
浪潮交换机track配置 项目背景高速网络拓扑网络情况分析通信线路收费网络路由 收费汇聚交换机相应配置收费汇聚track配置 项目背景 在实施省内一条高速公路时遇到的需求,本次涉及的主要是收费汇聚交换机的配置,浪潮网络设备在高速项目很少,通…...
深入浅出深度学习基础:从感知机到全连接神经网络的核心原理与应用
文章目录 前言一、感知机 (Perceptron)1.1 基础介绍1.1.1 感知机是什么?1.1.2 感知机的工作原理 1.2 感知机的简单应用:基本逻辑门1.2.1 逻辑与 (Logic AND)1.2.2 逻辑或 (Logic OR)1.2.3 逻辑与非 (Logic NAND) 1.3 感知机的实现1.3.1 简单实现 (基于阈…...
wpf在image控件上快速显示内存图像
wpf在image控件上快速显示内存图像https://www.cnblogs.com/haodafeng/p/10431387.html 如果你在寻找能够快速在image控件刷新大图像(比如分辨率3000*3000的图像)的办法,尤其是想把内存中的裸数据(只有图像的数据,不包…...
c# 局部函数 定义、功能与示例
C# 局部函数:定义、功能与示例 1. 定义与功能 局部函数(Local Function)是嵌套在另一个方法内部的私有方法,仅在包含它的方法内可见。 • 作用:封装仅用于当前方法的逻辑,避免污染类作用域,提升…...
DAY 45 超大力王爱学Python
来自超大力王的友情提示:在用tensordoard的时候一定一定要用绝对位置,例如:tensorboard --logdir"D:\代码\archive (1)\runs\cifar10_mlp_experiment_2" 不然读取不了数据 知识点回顾: tensorboard的发展历史和原理tens…...
