当前位置: 首页 > news >正文

【源码编译】Apache SeaTunnel-Web 适配最新2.3.4版本教程

Apache SeaTunnel新版本已经发布,感兴趣的小伙伴可以看之前版本发布的文章

file

本文主要给大家介绍为使用2.3.4版本的新特性,需要对Apache SeaTunnel-Web依赖的版本进行升级,而SeaTunnel2.3.4版本部分API跟之前版本不兼容,所以需要对 SeaTunnel-Web的源码进行修改适配。

源码修改编译

克隆SeaYunnel-Web源码到本地

  git  clone https://github.com/apache/seatunnel-web.git

在idea中打开项目

升级Pom中的SeaTunnel版本到2.3.4并重新导入依赖

  <seatunnel-framework.version>2.3.3</seatunnel-framework.version>改为<seatunnel-framework.version>2.3.4</seatunnel-framework.version>

因为大部分用户使用SeaTunnel Web都是基于SeaTunnel-2.3.3 版本做的适配,而最新发布的SeaTunnel2.3.4 部分API发生了改动导致直接升级的过程中会出现API不兼容的问题,所以本篇文章重点来了:我们需要对调用SeaTunnel API的SeaTunnel Web源码部分进行修改,修改完之后,我们就能完全适配2.3.4最新版本。

社区推出了2.3.X及Web系列专属的社群,感兴趣的小伙伴可以加社区小助手进群。

org.apache.dolphinscheduler.api.dto.seatunnel.bean.engine.EngineDataType

public static class SeaTunnelDataTypeConvertorimplements DataTypeConvertor<SeaTunnelDataType<?>> {@Overridepublic SeaTunnelDataType<?> toSeaTunnelType(String engineDataType) {return DATA_TYPE_MAP.get(engineDataType.toLowerCase(Locale.ROOT)).getRawType();}@Overridepublic SeaTunnelDataType<?> toSeaTunnelType(SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map)throws DataTypeConvertException {return seaTunnelDataType;}@Overridepublic SeaTunnelDataType<?> toConnectorType(SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map)throws DataTypeConvertException {return seaTunnelDataType;}@Overridepublic String getIdentity() {return "EngineDataTypeConvertor";}
}
// 改为
public static class SeaTunnelDataTypeConvertorimplements DataTypeConvertor<SeaTunnelDataType<?>> {@Overridepublic SeaTunnelDataType<?> toSeaTunnelType(String s, String s1) {return DATA_TYPE_MAP.get(s.toLowerCase(Locale.ROOT)).getRawType();}@Overridepublic SeaTunnelDataType<?> toSeaTunnelType(String s, SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map) {return seaTunnelDataType;}@Overridepublic SeaTunnelDataType<?> toConnectorType(String s, SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map) {return seaTunnelDataType;}@Overridepublic String getIdentity() {return "EngineDataTypeConvertor";}}

org.apache.seatunnel.app.service.impl.TableSchemaServiceImpl

public TableSchemaServiceImpl() throws IOException {Common.setStarter(true);Set<PluginIdentifier> pluginIdentifiers =SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK).keySet();ArrayList<PluginIdentifier> pluginIdentifiersList = new ArrayList<>();pluginIdentifiersList.addAll(pluginIdentifiers);List<URL> pluginJarPaths =new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiersList);//        Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();if (!pluginJarPaths.isEmpty()) {//            List<URL> files = FileUtils.searchJarFiles(path);pluginJarPaths.addAll(FileUtils.searchJarFiles(Common.pluginRootDir()));factory =new DataTypeConvertorFactory(new URLClassLoader(pluginJarPaths.toArray(new URL[0])));} else {factory = new DataTypeConvertorFactory();}
}
// 改为public TableSchemaServiceImpl() throws IOException {Common.setStarter(true);Set<PluginIdentifier> pluginIdentifiers =SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK).keySet();ArrayList<PluginIdentifier> pluginIdentifiersList = new ArrayList<>();pluginIdentifiersList.addAll(pluginIdentifiers);List<URL> pluginJarPaths =new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiersList);//        Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();if (!pluginJarPaths.isEmpty()) {//            List<URL> files = FileUtils.searchJarFiles(path);pluginJarPaths.addAll(FileUtils.searchJarFiles(Common.pluginRootDir()));factory =new DataTypeConvertorFactory(new URLClassLoader(pluginJarPaths.toArray(new URL[0])));} else {factory = new DataTypeConvertorFactory();}}SeaTunnelDataType<?> dataType = convertor.toSeaTunnelType(field.getType());
// 改为
SeaTunnelDataType<?> dataType =convertor.toSeaTunnelType(field.getName(), field.getType());

org.apache.seatunnel.app.service.impl.JobExecutorServiceImpl.executeJobBySeaTunnel()

 public Long executeJobBySeaTunnel(Integer userId, String filePath, Long jobInstanceId) {Common.setDeployMode(DeployMode.CLIENT);JobConfig jobConfig = new JobConfig();jobConfig.setName(jobInstanceId + "_job");try {SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build();SeaTunnelClient seaTunnelClient = createSeaTunnelClient();ClientJobExecutionEnvironment jobExecutionEnv =seaTunnelClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId()));jobInstanceDao.update(jobInstance);CompletableFuture.runAsync(() -> {waitJobFinish(clientJobProxy,userId,jobInstanceId,Long.toString(clientJobProxy.getJobId()),seaTunnelClient);});} catch (ExecutionException | InterruptedException e) {ExceptionUtils.getMessage(e);throw new RuntimeException(e);}return jobInstanceId;}

org.apache.seatunnel.app.service.impl.JobInstanceServiceImpl

else if (statusList.contains("CANCELLING")) {jobStatus = JobStatus.CANCELLING.name();
// 改为
else if (statusList.contains("CANCELING")) {jobStatus = JobStatus.CANCELING.name();

org.apache.seatunnel.app.service.impl.SchemaDerivationServiceImpl

TableFactoryContext context =new TableFactoryContext(Collections.singletonList(table),ReadonlyConfig.fromMap(config),Thread.currentThread().getContextClassLoader());
// 改为
TableTransformFactoryContext context =new TableTransformFactoryContext(Collections.singletonList(table),ReadonlyConfig.fromMap(config),Thread.currentThread().getContextClassLoader());

org.apache.seatunnel.app.thirdparty.engine.SeaTunnelEngineProxy

public void restoreJob(@NonNull String filePath, @NonNull Long jobInstanceId, @NonNull Long jobEngineId) {SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);JobConfig jobConfig = new JobConfig();jobConfig.setName(jobInstanceId + "_job");try {seaTunnelClient.restoreExecutionContext(filePath, jobConfig, jobEngineId).execute();} catch (ExecutionException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);}
}
// 改为
public void restoreJob(@NonNull String filePath, @NonNull Long jobInstanceId, @NonNull Long jobEngineId) {SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);JobConfig jobConfig = new JobConfig();jobConfig.setName(jobInstanceId + "_job");SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build();try {seaTunnelClient.restoreExecutionContext(filePath, jobConfig, seaTunnelConfig, jobEngineId).execute();} catch (ExecutionException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);}}

org.apache.seatunnel.app.thirdparty.framework.PluginDiscoveryUtil

public static Map<PluginIdentifier, ConnectorFeature> getConnectorFeatures(PluginType pluginType) throws IOException {Common.setStarter(true);if (!pluginType.equals(PluginType.SOURCE)) {throw new UnsupportedOperationException("ONLY support plugin type source");}Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();List<Factory> factories;if (path.toFile().exists()) {List<URL> files = FileUtils.searchJarFiles(path);factories =FactoryUtil.discoverFactories(new URLClassLoader(files.toArray(new URL[0])));} else {factories =FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());}Map<PluginIdentifier, ConnectorFeature> featureMap = new ConcurrentHashMap<>();factories.forEach(plugin -> {if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin;PluginIdentifier info =PluginIdentifier.of("seatunnel",PluginType.SOURCE.getType(),plugin.factoryIdentifier());featureMap.put(info,new ConnectorFeature(SupportColumnProjection.class.isAssignableFrom(tableSourceFactory.getSourceClass())));}});return featureMap;
}
// 改为public static Map<PluginIdentifier, ConnectorFeature> getConnectorFeatures(PluginType pluginType) {Common.setStarter(true);if (!pluginType.equals(PluginType.SOURCE)) {throw new UnsupportedOperationException("ONLY support plugin type source");}ArrayList<PluginIdentifier> pluginIdentifiers = new ArrayList<>();pluginIdentifiers.addAll(SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SOURCE).keySet());List<URL> pluginJarPaths =new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiers);List<Factory> factories;if (!pluginJarPaths.isEmpty()) {factories =FactoryUtil.discoverFactories(new URLClassLoader(pluginJarPaths.toArray(new URL[0])));} else {factories =FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());}Map<PluginIdentifier, ConnectorFeature> featureMap = new ConcurrentHashMap<>();factories.forEach(plugin -> {if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin;PluginIdentifier info =PluginIdentifier.of("seatunnel",PluginType.SOURCE.getType(),plugin.factoryIdentifier());featureMap.put(info,new ConnectorFeature(SupportColumnProjection.class.isAssignableFrom(tableSourceFactory.getSourceClass())));}});return featureMap;

代码格式化

mvn spotless:apply

编译打包

mvn clean package -DskipTests

至此,seatunnel web 适配 seatunnel2.3.4版本完成,对应的安装包会在 seatunnel-web-dist/target目录下生成

Linux部署测试

这里具体请参考之前社区其他老师发布的文章Apache SeaTunnel Web部署指南

重要的配置项

1、seatunnel-web数据库相关配置(application.yml) 
用来web服务中的数据持久化2、SEATUNNEL_HOME(环境变量)
seatunnel-web调用seaunnel的插件获取的API,扫描connector相关的连接器3、ST_WEB_HOME(环境变量)
seatunnel-web会加载seatunnel-web/datasource下的插件包,这里决定了seatunnel-web支持哪些数据源的任务定义4、重要的配置文件:
connector-datasource-mapper.yaml 
该配置文件配置了支持的数据源类型以及该数据源支持的数据同步方式等信息(比如是否支持多表同步、是否支持cdc等)
hazelcast-client.yaml 
seatunnel-web服务通过seatunnel-api的方式与seatunnel集群进行交互,该配置文件配置了集群节点等相关信息

感谢大家的阅读,希望对各位兄弟有所帮助,如果有任何疑问,欢迎来社区找我交流!

本文由 白鲸开源科技 提供发布支持!

相关文章:

【源码编译】Apache SeaTunnel-Web 适配最新2.3.4版本教程

Apache SeaTunnel新版本已经发布&#xff0c;感兴趣的小伙伴可以看之前版本发布的文章 本文主要给大家介绍为使用2.3.4版本的新特性&#xff0c;需要对Apache SeaTunnel-Web依赖的版本进行升级&#xff0c;而SeaTunnel2.3.4版本部分API跟之前版本不兼容&#xff0c;所以需要对 …...

数据集下载

一、数据集下载——谷歌Open images 谷歌Open-image-v6是由谷歌出资标注的一个超大型数据集&#xff0c;数据大小达到600多G&#xff0c;类别达到600多种分类&#xff0c;对于普通研究者而言&#xff0c;根本没办法全部下载下来做测试&#xff0c;也没必要。只需要下载与自己任…...

3、设计模式之工厂模式2(Factory)

一、什么是工厂模式 工厂模式属于创建型设计模式&#xff0c;它用于解耦对象的创建和使用。通常情况下&#xff0c;我们创建对象时需要使用new操作符&#xff0c;但是使用new操作符创建对象会使代码具有耦合性。工厂模式通过提供一个公共的接口&#xff0c;使得我们可以在不暴露…...

npm、nodejs和vue之间关系和区别介绍

本文讲解npm、Node.js和Vue.js这三者之间的关系和区别&#xff0c;以及它们各自的特点。 首先&#xff0c;让我们来了解一下Node.js。 **Node.js** 是一个开源的服务器端运行环境&#xff0c;它允许开发者使用JavaScript来编写服务器端的代码。在传统的Web开发中&#…...

DM数据库安装(Windows)

先解压安装包 点击setup安装 下一步 勾选接受然后下一步 下一步 选择典型安装下一步 下一步 搜索DM数据库配置助手然后一直下一步 然后搜索DM管理工具 登录 登录成功 widows版本安装成功...

Python的asyncio 多线程

-- 多线程、进程、协程是什么就不讲了&#xff0c;&#xff08;就是你理解的一边呼吸&#xff0c;一边看文章&#xff09; 仅解决问题的话&#xff0c;下边两篇不用看&#xff0c; Python 中的 async await 概念-CSDN博客 再深一点的看这个 Python中的多线程、进程、协程、…...

【分类讨论】【解析几何】【 数学】【推荐】1330. 翻转子数组得到最大的数组值

作者推荐 视频算法专题 本文涉及知识点 分类讨论 解析几何 LeetCode1330. 翻转子数组得到最大的数组值 给你一个整数数组 nums 。「数组值」定义为所有满足 0 < i < nums.length-1 的 |nums[i]-nums[i1]| 的和。 你可以选择给定数组的任意子数组&#xff0c;并将该子…...

一文了解Spring的SPI机制

文章目录 一文了解Spring的SPI机制Java SPIServiceLoader Spring SPISpringboot利用Spring SPI开发starter 一文了解Spring的SPI机制 Java SPI SPI 全称 Service Provider Interface &#xff0c;是 Java提供的一套用来被第三方实现或者扩展的接口&#xff0c;它可以用来启用…...

django根据时间(年月日)动态修改表名--方法一

方法一&#xff1a; 第一步&#xff1a;在models创建一个类&#xff0c;里边存放数据表中需要的字段&#xff0c;如下 class TemplateModel(models.Model):NowTime models.CharField(max_length5)name models.CharFiedld(max_length5)class Meta:abstract True # 基础类设…...

实现基本的登录功能

一、登录功能的前端处理过程 1、导入项目所需的图片和CSS等静态文件 参考代码存放client节点的/opt/code目录下 执行如下命令&#xff1a; [rootclient ~]# cp -r /opt/code/kongguan_web/src/assets/* /root/kongguan_web/src/assets/ 将参考代码中的css、icon、images等文…...

Java线程池实现原理及其在美团业务中的实践

随着计算机行业的飞速发展&#xff0c;摩尔定律逐渐失效&#xff0c;多核CPU成为主流。使用多线程并行计算逐渐成为开发人员提升服务器性能的基本武器。 J.U.C提供的线程池&#xff1a;ThreadPoolExecutor类&#xff0c;帮助开发人员管理线程并方便地执行并行任务。了解并合理…...

让AI给你写代码(四)—— 初步利用LangChain Agent根据输入生成,保存,执行

要进一步提升智能编码助手的效率&#xff0c;我觉得需要做到两点 1&#xff09; 进一步让主人聚焦于设计输入以及结果验证的循环 2&#xff09; 进一步让智能编码助手聚焦于代码实现和程序流程&#xff08;保存、打开&#xff0c;修订、执行、合并…&#xff09; 正好接触到LLM…...

Flutter does not exist

Flutter does not exist 原因&#xff1a;Generated.config 配置文件内路径缺失 原因&#xff1a;Flutter SDK缺失 通过配置文件查到Flutter SDK在本地的存放位置FLUTTER_FRAMEWORK_DIR/Users/haijunyan/Documents/flutter/bin/cache/artifacts/engine/ios 真机所需&#xf…...

AIX上安装gcc和g++

AIX的iso镜像中没有gcc的软件包&#xff0c;需要我们自己下载&#xff0c;我们可以在 Index of /download/rpmdb/deplists/aix72 下载对应gcc和g版本的依赖文件deps 我们使用的是4.9.4版本的软件包 我们首先安装gcc&#xff0c;在http://www.oss4aix.org/download/everythi…...

js实现扫描线填色算法使用canvas展示

算法原理 扫描线填色算法的基本思想是&#xff1a;用水平扫描线从上到下扫描由点线段构成的多段构成的多边形。每根扫描线与多边形各边产生一系列交点。将这些交点按照x坐标进行分类&#xff0c;将分类后的交点成对取出&#xff0c;作为两个端点&#xff0c;以所填的色彩画水平…...

考研模拟面试-题目【攻略】

考研模拟面试-题目【攻略】 前言版权推荐考研模拟面试-题目前面的问题通用问题专业题数据结构计算机网络操作系统数据库网络安全 手写题数据结构操作系统计算机网络 代码题基础代码题其他代码题 后面的问题补充题目 最后 前言 2023-10-19 12:00:57 以下内容源自《考研模拟面试…...

Frostmourne - Elasticsearch源日志告警配置

简介 配置Frostmourne 接入Elasticsearch源进行日志匹配告警&#xff0c;并静默规则&#xff0c;告警消息发送到企业微信&#xff0c;告警信息使用Markdown。 部署安装教程查看&#xff1a; https://songxwn.com/frostmourne_install ELK 安装教程&#xff1a;https://songx…...

GPT出现Too many requests in 1 hour. Try again later.

换节点 这个就不用多说了&#xff0c;你都可以上GPT帐号了&#xff0c;哈…… 清除cooki 然后退出账号&#xff0c;重新登录即可...

python爬虫实战——小红书

目录 1、博主页面分析 2、在控制台预先获取所有作品页的URL 3、在 Python 中读入该文件并做准备工作 4、处理图文类型作品 5、处理视频类型作品 6、异常访问而被中断的现象 7、完整参考代码 任务&#xff1a;在 win 环境下&#xff0c;利用 Python、webdriver、JavaS…...

Linux信号机制

目录 一、信号的概念 二、定时器 1. alarm函数 2. setitimer函数 3.signal和sigaction函数 三、使用SIGCHLD信号实现回收子进程 一、信号的概念 概念&#xff1a;信号是在软件层次上对中断机制的一种模拟&#xff0c;是一种异步通信方式 。所有信号的产生及处理全部都是由内…...

基于ViT的图像分类模型数据结构优化

基于ViT的图像分类模型数据结构优化 如果你用过ViT这类图像分类模型&#xff0c;可能会发现一个挺头疼的问题&#xff1a;模型跑起来慢&#xff0c;内存占用还特别大。一张图片进去&#xff0c;半天出不来结果&#xff0c;要是想批量处理&#xff0c;那更是卡得不行。 这其实…...

技术人的困境:参数碾压对手,客户却不选你?用“技术路线图叙事”破局

作为技术出身的B2B从业者&#xff0c;你可能一直坚信&#xff1a;技术够硬&#xff0c;就是最好的销售语言。你带着PPT去见客户&#xff0c;第一页&#xff1a;屈服强度对比图&#xff0c;比竞争对手高15%&#xff1b;第二页&#xff1a;专利清单&#xff0c;密密麻麻排了三页&…...

【研报221】2026年汽车零部件行业投资策略研究:智驾平权+人形机器人量产趋势

本报告提供限时下载&#xff0c;请查看文后提示以下仅为报告部分内容&#xff1a;此报告全面解析汽车零部件行业2026年的发展趋势与投资机会。2025年行业受益于乘用车销量增长&#xff0c;零部件板块收入同比8.3%&#xff0c;其中智能化赛道表现最优&#xff0c;收入增速达20.0…...

简述:openclaw应用二三事

简述&#xff1a;openclaw应用二三事 本blog地址&#xff1a;https://blog.csdn.net/hsg77...

【Java实战】Java集成AD域账号登录

目的&#xff1a; JAVA集成域账号登录的实现方案&#xff0c;通过LDAPUtil类实现对AD域的身份验证。系统采用双域服务器配置&#xff0c;支持主备切换。 关键点包括&#xff1a; 1) LDAP连接参数配置&#xff1b; 2) 区分管理员账号和普通域账号的双重验证机制&#xff0c;…...

FluentHub核心功能揭秘:现代UI、多标签任务与强大导航如何提升开发效率

FluentHub核心功能揭秘&#xff1a;现代UI、多标签任务与强大导航如何提升开发效率 【免费下载链接】FluentHub The stylish yet powerful GitHub client for Windows. 项目地址: https://gitcode.com/gh_mirrors/flu/FluentHub FluentHub是一款为Windows平台打造的时尚…...

day52 ResNet18 CBAM

在深度学习的旅程中&#xff0c;我们不断探索如何提升模型的性能。今天&#xff0c;我将分享我在 ResNet18 模型中插入 CBAM&#xff08;Convolutional Block Attention Module&#xff09;模块&#xff0c;并采用分阶段微调策略的实践过程。通过这个过程&#xff0c;我不仅提升…...

【HarmonyOS 5.0】DevEco Testing:鸿蒙应用质量保障的终极武器

——全方位测试解决方案与代码实战 一、工具定位与核心能力 DevEco Testing是HarmonyOS官方推出的​​一体化测试平台​​&#xff0c;覆盖应用全生命周期测试需求&#xff0c;主要提供五大核心能力&#xff1a; ​​测试类型​​​​检测目标​​​​关键指标​​功能体验基…...

centos 7 部署awstats 网站访问检测

一、基础环境准备&#xff08;两种安装方式都要做&#xff09; bash # 安装必要依赖 yum install -y httpd perl mod_perl perl-Time-HiRes perl-DateTime systemctl enable httpd # 设置 Apache 开机自启 systemctl start httpd # 启动 Apache二、安装 AWStats&#xff0…...

多模态商品数据接口:融合图像、语音与文字的下一代商品详情体验

一、多模态商品数据接口的技术架构 &#xff08;一&#xff09;多模态数据融合引擎 跨模态语义对齐 通过Transformer架构实现图像、语音、文字的语义关联。例如&#xff0c;当用户上传一张“蓝色连衣裙”的图片时&#xff0c;接口可自动提取图像中的颜色&#xff08;RGB值&…...