Seatunnel解决ftp读取json文件无法读取数组以及格式化之后的json无法解析的问题
问题原因
在JsonRead这个方法里面 在源码中使用的逻辑是读取一行 然后把这个json进行解析
 但是这样存在一个问题 比如如果json的格式是这样的
 {
 name:“zhangsan”,
 age:25
 }
 如果是这样的话 第一行读到的内容就是 {
 显然 一个 { 并不是一个json 这样会导致解析json失败
问题解决的思路
我的方法是将整个文件中的内容全部解析
 然后使用Seatunnel中自带的JackJson这个工具类进行解析
 然后再获取到单个的Json对象 之后再解析成一个Json的字符串
 因为解析过之后的Json字符串肯定不存在换行 所以这种换行的问题算是规避了
 但是这样又引发了另一个问题就是 一下子加载全部的文件内容可能会导致内存飙升 而且解析json 构造对象这个过程也是比较耗费资源的
 但是我目前没有想出来更好的方法
 我目前的业务需求是 这种ftp的文件都是小文件 不存在特别大的json 所以我的这个方法是可以完成现在的需求的
修改代码的内容
要修改的代码的位置是
 org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
    @Overridepublic void readProcess(String path,String tableId,Collector<SeaTunnelRow> output,InputStream inputStream,Map<String, String> partitionsMap,String currentFileName)throws IOException {InputStream actualInputStream;switch (compressFormat) {case LZO:LzopCodec lzo = new LzopCodec();actualInputStream = lzo.createInputStream(inputStream);break;case NONE:actualInputStream = inputStream;break;default:log.warn("Json file does not support this compress type: {}",compressFormat.getCompressCodec());actualInputStream = inputStream;break;}try (BufferedReader reader =new BufferedReader(new InputStreamReader(actualInputStream, encoding))) {//TODO wxt 优先使用之前的方法try{reader.lines().forEach(line -> {try {SeaTunnelRow seaTunnelRow =deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8));if (isMergePartition) {int index = seaTunnelRowType.getTotalFields();for (String value : partitionsMap.values()) {seaTunnelRow.setField(index++, value);}}seaTunnelRow.setTableId(tableId);output.collect(seaTunnelRow);} catch (IOException e) {String errorMsg =String.format("Deserialize this jsonFile data [%s] failed, please check the origin data",line);throw new FileConnectorException(FileConnectorErrorCode.DATA_DESERIALIZE_FAILED,errorMsg,e);}});}catch (Exception e){//region 我修改的内容//首先读取全部的内容// 将 BufferedReader 内容读取到一个 StringStringWriter stringWriter = new StringWriter();String line;while ((line = reader.readLine()) != null) {stringWriter.write(line);}String jsonContent = stringWriter.toString();// 判断 JSON 类型并处理ObjectMapper objectMapper = new ObjectMapper();JsonNode jsonNode = objectMapper.readTree(jsonContent);if (jsonNode.isArray()) {// 遍历数组并转换为单行字符串for (JsonNode node : jsonNode) {String singleLineJson = objectMapper.writeValueAsString(node);// region 这一部分是我直接从上面复制下来的try {SeaTunnelRow seaTunnelRow =deserializationSchema.deserialize(singleLineJson.getBytes(StandardCharsets.UTF_8));if (isMergePartition) {int index = seaTunnelRowType.getTotalFields();for (String value : partitionsMap.values()) {seaTunnelRow.setField(index++, value);}}seaTunnelRow.setTableId(tableId);output.collect(seaTunnelRow);} catch (IOException e1) {String errorMsg =String.format("Deserialize this jsonFile data [%s] failed, please check the origin data",singleLineJson);throw new FileConnectorException(FileConnectorErrorCode.DATA_DESERIALIZE_FAILED,errorMsg,e);}// endregion}} else if (jsonNode.isObject()) {String singleLineJson = objectMapper.writeValueAsString(jsonNode);// region 这一部分是我直接从上面复制下来的try {SeaTunnelRow seaTunnelRow =deserializationSchema.deserialize(singleLineJson.getBytes(StandardCharsets.UTF_8));if (isMergePartition) {int index = seaTunnelRowType.getTotalFields();for (String value : partitionsMap.values()) {seaTunnelRow.setField(index++, value);}}seaTunnelRow.setTableId(tableId);output.collect(seaTunnelRow);} catch (IOException e1) {String errorMsg =String.format("Deserialize this jsonFile data [%s] failed, please check the origin data",singleLineJson);throw new FileConnectorException(FileConnectorErrorCode.DATA_DESERIALIZE_FAILED,errorMsg,e);}// endregion}//endregion}}}
相关文章:
Seatunnel解决ftp读取json文件无法读取数组以及格式化之后的json无法解析的问题
问题原因 在JsonRead这个方法里面 在源码中使用的逻辑是读取一行 然后把这个json进行解析 但是这样存在一个问题 比如如果json的格式是这样的 { name:“zhangsan”, age:25 } 如果是这样的话 第一行读到的内容就是 { 显然 一个 { 并不是一个…...
Elasticsearch在liunx 中单机部署
下载配置 1、下载 官网下载地址 2、上传解压 tar -zxvf elasticsearch-XXX.tar.gz 3、新建组和用户 (elasticsearch 默认不允许root账户) #创建组 es groupadd es #新建用户 useradd ryzhang -g es 4、更改文件夹的用户权限 chown -R ryzhang …...
深入探索 HarmonyOS 的 Navigation 组件:灵活的页面管理与动态导航
在移动应用开发中,页面的跳转和导航一直是核心功能之一。对于 HarmonyOS 开发者来说,Navigation 组件提供了一个强大的工具来实现灵活的页面管理和导航体验。今天,我们将深入探讨如何使用 HarmonyOS 中的 Navigation 组件来管理页面跳转、工具…...
【CUDA】CUDA Hierarchy
【CUDA】CUDA 基本概念和 Hierarchy CUDA 编程基础:Host 和 Device 工作流程 首先简单介绍CUDA 编程的基本概念:讲解 Host(CPU)与 Device(GPU)的区别、内存管理以及 CUDA 运行时的工作机制。 Host&#x…...
28.100ASK_T113-PRO Linux+QT 显示一张照片
1.添加资源文件 2. 主要代码 #include "mainwindow.h" #include "ui_mainwindow.h" #include <QImage> #include <QPixmap>MainWindow::MainWindow(QWidget *parent) :QMainWindow(parent),ui(new Ui::MainWindow) {ui->setupUi(this);QIm…...
GitLab使用中遇到的一些问题-记录
错误内容一 Warning: Permanently added gitlab.com (ED25519) to the list of known hosts. gitgitlab.com: Permission denied (publickey). Could not read from remote repository. Please make sure you have the correct access rights and the repository exists. …...
【微服务】Docker
一、Docker基础 1、依赖的兼容问题:Docker允许开发中将应用、依赖、函数库、配置一起打包,形成可移植镜像Docker应用运行在容器中,使用沙箱机制,相互隔离。 2、如何解决开发、测试、生产环境有差异的问题:Docker镜像…...
【C#】书籍信息的添加、修改、查询、删除
文章目录 一、简介二、程序功能2.1 Book类属性:方法: 2.2 Program 类 三、方法:四、用户界面流程:五、程序代码六、运行效果 一、简介 简单的C#控制台应用程序,用于管理书籍信息。这个程序将允许用户添加、编辑、查看…...
Python 入门教程(2)搭建环境 | 2.4、VSCode配置Node.js运行环境
文章目录 一、VSCode配置Node.js运行环境1、软件安装2、安装Node.js插件3、配置VSCode4、创建并运行Node.js文件5、调试Node.js代码 一、VSCode配置Node.js运行环境 1、软件安装 安装下面的软件: 安装Node.js:Node.js官网 下载Node.js安装包。建议选择L…...
Spark常问面试题---项目总结
一、数据清洗,你都清洗什么?或者说 ETL 你是怎么做的? 我在这个项目主要清洗的式日志数据,日志数据传过来的json格式 去除掉无用的字段,过滤掉json格式不正确的脏数据 过滤清洗掉日志中缺少关键字段的数据ÿ…...
【AI系统】Auto-Tuning 原理
Auto-Tuning 原理 在硬件平台驱动算子运行需要使用各种优化方式来提高性能,然而传统的手工编写算子库面临各种窘境,衍生出了自动生成高性能算子的的方式,称为自动调优。在本文我们首先分析传统算子库面临的挑战,之后介绍基于 TVM…...
AMEYA360:上海永铭电子全新高压牛角型铝电解电容IDC3系列,助力AI服务器电源高效运转
随着数据中心和云计算的高速发展,AI服务器的能效要求日益提高。如何在有限空间内实现更高的功率密度和稳定的电源管理,成为AI服务器电源设计的一大挑战。永铭推出全新高压牛角型铝电解电容IDC3系列,以大容量、小尺寸的创新特性,为…...
echarts地图立体效果,echarts地图点击事件,echarts地图自定义自定义tooltip
一.地图立体效果 方法1:两层地图叠加 实现原理:geo数组中放入两个地图对象,通过修改zlevel属性以及top,left,right,bottom形成视觉差 配置项参考如下代码: geo: [{zlevel: 2,top: 96,map: map,itemStyle: {color: #091A51ee,opacity: 1,borderWidth: 2,borderColor: #16BAFA…...
什么是 Socket?
Socket(套接字)是计算机网络编程中的一个重要概念,它用于在不同计算机之间进行通信。Socket 提供了一种机制,使得应用程序可以通过网络发送和接收数据。Socket 通信通常基于 TCP/IP 协议,但也可以使用其他协议…...
【版本控制】SVN安装到使用一条路讲解
文章目录 安装使用 Subversion (SVN) 是一款集中式版本控制系统,广泛应用于团队协作和代码管理中。尽管随着 Git 的兴起,集中式版本控制逐渐被分布式工具取代,但 SVN 仍在许多企业项目中发挥着重要作用。它的简单、稳定和易用特性,…...
KVCKVO
KVC KVC意思是键值编码,是一种可以通过键名来访问对象属性的机制,也可以对属性进行赋值,包括私有属性,由于KVC的定义是对OC中的NSObject的扩展进行实现的,所以如果要使用KVC机制,那么这个类需要继承NSObje…...
PyQt设计界面优化 #qss #ui设计 #QMainWindow
思维导图 通过qss实现ui界面设计优化 Qss是Qt程序界面中用来设置控件的背景图片、大小、字体颜色、字体类型、按钮状态变化等属性,它是用来美化UI界面。实现界面和程序的分离,快速切换界面。 首先我们在Pytchram创建一个新目录 然后将我们所需要的图片打…...
Qt Serial Bus 前置介绍篇
文章目录 Qt Serial Bus 简介前言 什么是 Qt Serial Bus?Qt Serial Bus 的核心功能支持的协议1. **CAN 总线**2. **Modbus**3. **自定义协议** 应用场景优势总结 Qt Serial Bus 简介 前言 Qt Serial Bus 是 Qt 框架中的一个模块,用于与工业设备和嵌入式…...
12.2深度学习_项目实战
十、项目实战 鲍勃开了自己的手机公司。他想与苹果、三星等大公司展开硬仗。 他不知道如何估算自己公司生产的手机的价格。在这个竞争激烈的手机市场,你不能简单地假设事情。为了解决这个问题,他收集了各个公司的手机销售数据。 鲍勃想找出手机的特性(例…...
LeetCode 64. 最小路径和(HOT100)
第一次错误代码: class Solution { public:int minPathSum(vector<vector<int>>& grid) {int dp[205][205] {0};int m grid.size(),n grid[0].size();for(int i 1 ;i<m;i){for(int j 1;j<n;j){dp[i][j] min(dp[i][j-1],dp[i-1][j])gr…...
使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式
一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明:假设每台服务器已…...
前端倒计时误差!
提示:记录工作中遇到的需求及解决办法 文章目录 前言一、误差从何而来?二、五大解决方案1. 动态校准法(基础版)2. Web Worker 计时3. 服务器时间同步4. Performance API 高精度计时5. 页面可见性API优化三、生产环境最佳实践四、终极解决方案架构前言 前几天听说公司某个项…...
蓝牙 BLE 扫描面试题大全(2):进阶面试题与实战演练
前文覆盖了 BLE 扫描的基础概念与经典问题蓝牙 BLE 扫描面试题大全(1):从基础到实战的深度解析-CSDN博客,但实际面试中,企业更关注候选人对复杂场景的应对能力(如多设备并发扫描、低功耗与高发现率的平衡)和前沿技术的…...
Linux-07 ubuntu 的 chrome 启动不了
文章目录 问题原因解决步骤一、卸载旧版chrome二、重新安装chorme三、启动不了,报错如下四、启动不了,解决如下 总结 问题原因 在应用中可以看到chrome,但是打不开(说明:原来的ubuntu系统出问题了,这个是备用的硬盘&a…...
MySQL用户和授权
开放MySQL白名单 可以通过iptables-save命令确认对应客户端ip是否可以访问MySQL服务: test: # iptables-save | grep 3306 -A mp_srv_whitelist -s 172.16.14.102/32 -p tcp -m tcp --dport 3306 -j ACCEPT -A mp_srv_whitelist -s 172.16.4.16/32 -p tcp -m tcp -…...
【电力电子】基于STM32F103C8T6单片机双极性SPWM逆变(硬件篇)
本项目是基于 STM32F103C8T6 微控制器的 SPWM(正弦脉宽调制)电源模块,能够生成可调频率和幅值的正弦波交流电源输出。该项目适用于逆变器、UPS电源、变频器等应用场景。 供电电源 输入电压采集 上图为本设计的电源电路,图中 D1 为二极管, 其目的是防止正负极电源反接, …...
SQL慢可能是触发了ring buffer
简介 最近在进行 postgresql 性能排查的时候,发现 PG 在某一个时间并行执行的 SQL 变得特别慢。最后通过监控监观察到并行发起得时间 buffers_alloc 就急速上升,且低水位伴随在整个慢 SQL,一直是 buferIO 的等待事件,此时也没有其他会话的争抢。SQL 虽然不是高效 SQL ,但…...
DingDing机器人群消息推送
文章目录 1 新建机器人2 API文档说明3 代码编写 1 新建机器人 点击群设置 下滑到群管理的机器人,点击进入 添加机器人 选择自定义Webhook服务 点击添加 设置安全设置,详见说明文档 成功后,记录Webhook 2 API文档说明 点击设置说明 查看自…...
Python 实现 Web 静态服务器(HTTP 协议)
目录 一、在本地启动 HTTP 服务器1. Windows 下安装 node.js1)下载安装包2)配置环境变量3)安装镜像4)node.js 的常用命令 2. 安装 http-server 服务3. 使用 http-server 开启服务1)使用 http-server2)详解 …...
Elastic 获得 AWS 教育 ISV 合作伙伴资质,进一步增强教育解决方案产品组合
作者:来自 Elastic Udayasimha Theepireddy (Uday), Brian Bergholm, Marianna Jonsdottir 通过搜索 AI 和云创新推动教育领域的数字化转型。 我们非常高兴地宣布,Elastic 已获得 AWS 教育 ISV 合作伙伴资质。这一重要认证表明,Elastic 作为 …...
