当前位置: 首页 > news >正文

Spark - 继承 FileOutputFormat 实现向 HDFS 地址追加文件

目录

一.引言

二.源码浅析

1.RDD.saveAsTextFile

2.TextOutputFormat 

3.FileOutputFormat

三.源码修改

1.修改文件生成逻辑 - getRecordWriter

2.允许目录存在 - checkoutputSpecs

3.全部代码 - TextOutputFormatV2

四.追加存储代码实战

五.总结


一.引言

Output directory file XXX already exists 目标目录已经存在,这个报错写 Spark 的同学都不会陌生,它不允许我们在同一个目录持续增加文件存储。在使用 Flink 文件流场景中,我们有向 HDFS 目录追加文件的需求,所以下面我们尝试继承 FileOutputFormat 实现自定义文件追加。

二.源码浅析

自定义实现之前,我们需要明确一下追加文件的两个主要问题:

- 允许目录存在

即避免 Output directory file XXX already exists 的报错

- 避免 File 重复

由于是追加文件,这里我们要避免文件名相同导致追加失败

1.RDD.saveAsTextFile

这个是我们日常使用的文本落地 API:

这里将原始的 RDD[String] 转化为 (NullWritable.get(), text) 的 pairRDD 并调用后续的 saveAsHadoopFile 并传入 TextOutputFormat:

- NullWritable

在Hadoop1.+中是 "Comparable",因此编译器无法找到隐式为其排序,并将使用默认的 "null"。然而,它是一个 `Comparable[NullWritable]` 在 Hadoop2.+ 中,编译器将调用隐式  "Ordering.ordered” 方法来创建为 "NullWritable" 排序。这就是为什么编译器会生成不同的匿名Hadoop1.+和Hadoop2.+中“saveAsTextFile”的类。因此,在这里我们提供了一个显式排序“null”,以确保编译器生成 "saveAsTextFile" 的字节码相同。

这里翻译自官方 API,简言之,我们后续自定义 OutputFormat 时需要将 RDD[String] 转换为 PairRDD 并将 key 置为 NullWritable,否则这里无法调用 saveAsHadoopFile:

2.TextOutputFormat 

前者继承了后者,我们先看下前者复写了哪些函数:

非常简洁,大部分方法都使用父类的实现,这里 getRecordWriter 的具体实现对应我们上面提到的第二个问题即避免文件重复,因为其负责根据 name 轮训生成落地的 Path 地址,我们修改这个函数即可避免追加时文件重复。

3.FileOutputFormat

TextOutputFormat Extends FileOutputFormat,上面 TextOutputFormat 只实现了 Path 相关的工作,所以需要继续到父类 FileOutputFormat 寻找抛出异常的语句:

在文件中搜索 already exists 即可定位到当前函数,是不是很熟悉,因此针对第一个问题避免文件的报错就要修改这里了,最简单的我们把这三行注释掉即可。

三.源码修改

经过上面的源码三部曲,我们如何修改 TextOutputFormat 思路也很清晰了,修改文件生成逻辑、取消抛出异常即可,下面看一下代码实现:

1.修改文件生成逻辑 - getRecordWriter

这里我们在源码中增加了 updateFileName 函数,该函数由用户自己定义输出文件名,常规的我们可以按照 part-00000、part-00001 的顺序继续存储下去,当然如果为了区分追加文件的添加时间与类型,我们也可以给其打上时间戳和自定义标记,都不想用的话也可以直接用 UUID 代替:

import java.util.UUIDUUID.randomUUID().toString

下面看一下 updateFileName 的实现:

这里初始化变量 fileName 为 "",更新时对其加锁并基于当前 name 进行判断,如果当前 fileName 为 "",则代表是第一次保存,因此默认使用 name 的 part-00000,后续再多次存储是,我们就可以获取 fileName 的后缀进行累加输出了,这里使用 DecimalFormat 实现了自动补 0 的操作。

  static String fileName = "";DecimalFormat decimalFormat = new DecimalFormat("00000");public void updateFileName(String name) {synchronized (fileName) {if (fileName.equals("")) {fileName = name;} else {fileName = "part-" + decimalFormat.format(Integer.parseInt(fileName.split("-")[1]) + 1);}}}

2.允许目录存在 - checkoutputSpecs

把 throw Exception 的异常去掉就好了,这里保留了 Println 提示目录已经存在并开始追加。

3.全部代码 - TextOutputFormatV2

换个 TextOutputFormatV2 实现我们追加文件存储的目的。

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.text.DecimalFormat;import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.util.*;/*** An {@link OutputFormat} that writes plain text files.*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TextOutputFormatV2<K, V> extends FileOutputFormat<K, V> {static String fileName = "";DecimalFormat decimalFormat = new DecimalFormat("00000");public void updateFileName(String name) {synchronized (fileName) {if (fileName.equals("")) {fileName = name;} else {fileName = "part-" + decimalFormat.format(Integer.parseInt(fileName.split("-")[1]) + 1);}}}protected static class LineRecordWriter<K, V>implements RecordWriter<K, V> {private static final String utf8 = "UTF-8";private static final byte[] newline;static {try {newline = "\n".getBytes(utf8);} catch (UnsupportedEncodingException uee) {throw new IllegalArgumentException("can't find " + utf8 + " encoding");}}protected DataOutputStream out;private final byte[] keyValueSeparator;public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {this.out = out;try {this.keyValueSeparator = keyValueSeparator.getBytes(utf8);} catch (UnsupportedEncodingException uee) {throw new IllegalArgumentException("can't find " + utf8 + " encoding");}}public LineRecordWriter(DataOutputStream out) {this(out, "\t");}/*** Write the object to the byte stream, handling Text as a special* case.* @param o the object to print* @throws IOException if the write throws, we pass it on*/private void writeObject(Object o) throws IOException {if (o instanceof Text) {Text to = (Text) o;out.write(to.getBytes(), 0, to.getLength());} else {out.write(o.toString().getBytes(utf8));}}public synchronized void write(K key, V value)throws IOException {boolean nullKey = key == null || key instanceof NullWritable;boolean nullValue = value == null || value instanceof NullWritable;if (nullKey && nullValue) {return;}if (!nullKey) {writeObject(key);}if (!(nullKey || nullValue)) {out.write(keyValueSeparator);}if (!nullValue) {writeObject(value);}out.write(newline);}public synchronized void close(Reporter reporter) throws IOException {out.close();}}@Overridepublic void checkOutputSpecs(FileSystem ignored, JobConf job)throws FileAlreadyExistsException,InvalidJobConfException, IOException {// Ensure that the output directory is set and not already therePath outDir = getOutputPath(job);if (outDir == null && job.getNumReduceTasks() != 0) {throw new InvalidJobConfException("Output directory not set in JobConf.");}if (outDir != null) {FileSystem fs = outDir.getFileSystem(job);// normalize the output directoryoutDir = fs.makeQualified(outDir);setOutputPath(job, outDir);// get delegation token for the outDir's file systemTokenCache.obtainTokensForNamenodes(job.getCredentials(),new Path[] {outDir}, job);// check its existenceif (fs.exists(outDir)) {System.out.println("Output directory " + outDir + " already exists, Start Append!");}}}public RecordWriter<K, V> getRecordWriter(FileSystem ignored,JobConf job,String name,Progressable progress)throws IOException {boolean isCompressed = getCompressOutput(job);String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator","\t");updateFileName(name);if (!isCompressed) {Path file = FileOutputFormat.getTaskOutputPath(job, fileName);FileSystem fs = file.getFileSystem(job);FSDataOutputStream fileOut = fs.create(file, progress);return new com.CommonTool.TextOutputFormatV2.LineRecordWriter<K, V>(fileOut, keyValueSeparator);} else {Class<? extends CompressionCodec> codecClass =getOutputCompressorClass(job, GzipCodec.class);// create the named codecCompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);// build the filename including the extensionPath file =FileOutputFormat.getTaskOutputPath(job,fileName + codec.getDefaultExtension());FileSystem fs = file.getFileSystem(job);FSDataOutputStream fileOut = fs.create(file, progress);return new com.CommonTool.TextOutputFormatV2.LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator);}}
}

四.追加存储代码实战

  def main(args: Array[String]): Unit = {val (argsList, argsMap) = ArgsParseUtil.parseArgs(args)val conf = (new SparkConf).setAppName("AppendFileToHdfs").setMaster("local[*]")val spark = SparkSession.builder.config(conf).getOrCreate()val sc = spark.sparkContextval output = argsMap.getOrElse("output", "./append_output")val data = sc.parallelize(0 to 1000).mapPartitions { iter =>val text = new Text()iter.map { x =>text.set(x.toString)(NullWritable.get(), text)}}(0 until 10).foreach(epoch => {// 存储一次无法继续存储// data.saveAsTextFile(output)data.saveAsHadoopFile(output, classOf[NullWritable], classOf[Text], classOf[TextOutputFormatV2[NullWritable, String]])})}

首先转化为 (NullWritable.get(), text) 的 pairRDD,随后调用 saveAsHadoopFile 方法并传入我们自定义的 TextOutputFormatV2 即可,由于我们 for 循环了 10 次,所以打印了 10 个相关日志。

下面再看下生成的文件:

每个文件存 250 个数字,每次存储 4 个 part,10 次 40 个且保持递增顺序。

五.总结

想要结合源码进行修改时,结合自己的需求,带着问题去找对应的函数再复写就 OK 了,由于这里逻辑比较简单所以我们也没踩太多坑。上面采用的是单次 Job 连续存储,所以 Format 里 FileName 能够达到累加的情况,如果是多个 Job 重复启动,则每次获取的都是 part-00000,这时如果还想要保持文件名递增的话可以使用 FileSystem.listStatus 遍历文件夹获取 modifyTime 最新的文件并取其 name 即可拿到最新的文件名 part,此时将参数传入 TextFormat 并修改 update 逻辑即可实现多 Job 重复启动且文件名递增的需求了。当然了,使用 UUID + Date 是最省事滴。

相关文章:

Spark - 继承 FileOutputFormat 实现向 HDFS 地址追加文件

目录 一.引言 二.源码浅析 1.RDD.saveAsTextFile 2.TextOutputFormat 3.FileOutputFormat 三.源码修改 1.修改文件生成逻辑 - getRecordWriter 2.允许目录存在 - checkoutputSpecs 3.全部代码 - TextOutputFormatV2 四.追加存储代码实战 五.总结 一.引言 Output d…...

树莓派编程控制继电器及继电器组

目录 一&#xff0c;继电器说明 ● 继电器接口说明 ① 继电器输入端: ② 继电器输出端: 二&#xff0c;树莓派控制继电器 三&#xff0c;树莓派控制继电器组 一&#xff0c;继电器说明 通俗点讲&#xff0c;可以把继电器理解成是一些功能设备的控制开关。 ● LOW&#…...

oracle和mysql的区别

Oracle与MySQL的区别以及优缺点 MySQL的特点 1、性能卓越&#xff0c;服务稳定&#xff0c;很少出现异常宕机&#xff1b; 2、开放源代码无版本制约&#xff0c;自主性及使用成本低&#xff1b; 3、历史悠久&#xff0c;社区和用户非常活跃&#xff0c;遇到问题及时寻求帮助…...

<Linux开发> linux应用开发-之-uart通信开发例程

一、简介 串口全称叫做串行接口&#xff0c;串行接口指的是数据一个一个的按顺序传输&#xff0c;通信线路简单。使用两条线即可. 实现双向通信&#xff0c;一条用于发送&#xff0c;一条用于接收。串口通信距离远&#xff0c;但是速度相对会低&#xff0c;串口是一种很常用的工…...

基于深度学习的安全帽检测系统(YOLOv5清新界面版,Python代码)

摘要&#xff1a;安全帽检测系统用于自动化监测安全帽佩戴情况&#xff0c;在需要佩戴安全帽的场合自动安全提醒&#xff0c;实现图片、视频和摄像头等多种形式监测。在介绍算法原理的同时&#xff0c;给出Python的实现代码、训练数据集&#xff0c;以及PyQt的UI界面。安全帽检…...

Linux - 进程控制(进程替换)

0.引入创建子进程的目的是什么&#xff1f;就是为了让子进程帮我执行特定的任务让子进程执行父进程的一部分代码如果子进程想执行一个全新的程序代码呢&#xff1f; 那么就要使用进程的程序替换为什么要有程序替换&#xff1f;也就是说子进程想执行一个全新的程序代码&#xff…...

Java中 ==和equals的区别是什么?

作用&#xff1a; 基本类型&#xff0c;比较值是否相等引用类型&#xff0c;比较内存地址值是否相等不能比较没有父子关系的两个对象equals()方法的作用&#xff1a; JDK 中的类一般已经重写了 equals()&#xff0c;比较的是内容自定义类如果没有重写 equals()&#xff0c;将…...

Linux(网络基础---网络层)

文章目录0. 前言1. IP协议1-1 基本概念1-2 协议头格式2. 网段划分2-1 基本概念2.2 IP地址分五大类2-3 特殊的IP地址2-4 IP地址的数量限制2-5 私有IP地址和公网IP地址2-6 路由0. 前言 前面我们讲了&#xff0c;应用层、传输层&#xff1b;本章讲网络层。 应用层&#xff1a;我…...

空间信息智能应用团队研究成果介绍及人才引进

目录1、多平台移动测量技术1.1 车载移动测量系统1.2 机载移动测量系统2、数据处理与应用技术研究2.1 点云与影像融合2.2 点云配准与拼接2.3 点云滤波与分类2.4 道路矢量地图提取2.5 道路三维自动建模2.6 道路路面三维病害分析2.7 多期点云三维变形分析2.8 地表覆盖遥感监测分析…...

ChatGPT应用场景与工具推荐

目录 写在前面 一、关于ChatGPT 二、应用实例 1.写文章 2.入门新的知识 3.解决疑难问题 4.生成预演问题 5.文本改写 6.语言翻译 7.思维导图 8.PDF阅读理解 9.操作格式化的数据 10.模拟场景 11.写代码 三、现存局限 写在前面 本文会简单介绍ChatGPT的特点、局限以…...

图像分类卷积神经网络模型综述

图像分类卷积神经网络模型综述遇到问题 图像分类&#xff1a;核心任务是从给定的分类集合中给图像分配一个标签任务。 输入&#xff1a;图片 输出&#xff1a;类别。 数据集MNIST数据集 MNIST数据集是用来识别手写数字&#xff0c;由0~9共10类别组成。 从MNIST数据集的SD-1和…...

艹,终于在8226上把灯点亮了

接上次点文章ESP8266还可以这样玩这次&#xff0c;我终于学会了在ESP8266上面点亮LED灯了现在一个单片机的价格是几块&#xff0c;加上一个晶振&#xff0c;再来一个快递费&#xff0c;十几块钱还是需要的。所以能用这个ESP8266来当单片机玩&#xff0c;还是比较不错的可以在ub…...

脱不下孔乙己的长衫,现代的年轻人该怎么办?

“如果我没读过书&#xff0c;我还可以做别的工作&#xff0c;可我偏偏读过书” “学历本该是我的敲门砖&#xff0c;却成了我脱不下的长衫。” 最近&#xff0c;“脱下孔乙己的长衫”在网上火了。在鲁迅的原著小说中&#xff0c;孔乙己属于知识阶级&#xff08;长衫客&#xf…...

Matlab实现遗传算法

遗传算法&#xff08;Genetic Algorithm&#xff0c;GA&#xff09;是一种基于生物进化理论的优化算法&#xff0c;通过模拟自然界中的遗传过程&#xff0c;来寻找最优解。 在遗传算法中&#xff0c;每个解被称为个体&#xff0c;每个个体由一组基因表示&#xff0c;每个基因是…...

评价公式-均方误差

均方误差的公式可以通过以下步骤推导得出&#xff1a; 假设有n个样本&#xff0c;真实值分别为y₁, y₂, ……, yₙ&#xff0c;预测值分别为ŷ₁, ŷ₂, ……, ŷₙ。 首先&#xff0c;我们可以定义误差&#xff08;error&#xff09;为预测值与真实值之间的差&#xff1a; …...

冲击蓝桥杯-时间问题(必考)

目录 前言&#xff1a; 一、时间问题 二、使用步骤 1、考察小时&#xff0c;分以及秒的使用、 2、判断日期是否合法 3、遍历日期 4、推算星期几 总结 前言&#xff1a; 时间问题可以说是蓝桥杯&#xff0c;最喜欢考的问题了,因为时间问题不涉及到算法和一些复杂的知识&#xf…...

10个杀手级应用的Python自动化脚本

10个杀手级应用的Python自动化脚本 重复的任务总是耗费时间和枯燥的。想象一下&#xff0c;逐一裁剪100张照片&#xff0c;或者做诸如Fetching APIs、纠正拼写和语法等任务&#xff0c;所有这些都需要大量的时间。为什么不把它们自动化呢&#xff1f;在今天的文章中&#xff0c…...

2023​史上最全软件测试工程师常见的面试题总结​ 备战金三银四

在这里我给大家推荐一套专门讲解软件测试简历&#xff0c;和面试题的视频&#xff0c;实测有效&#xff0c;建议大家可以看看&#xff01; 春招必看已上岸&#xff0c;软件测试常问面试题【全网最详细&#xff0c;让你不再踩坑】_哔哩哔哩_bilibili春招必看已上岸&#xff0c;…...

2023年全国最新安全员精选真题及答案29

百分百题库提供安全员考试试题、建筑安全员考试预测题、建筑安全员ABC考试真题、安全员证考试题库等&#xff0c;提供在线做题刷题&#xff0c;在线模拟考试&#xff0c;助你考试轻松过关。 81.&#xff08;单选题&#xff09;同一建筑施工企业在12个月内连续发生&#xff08;&…...

关系数据库的7个基本特征

文章目录关系数据库中的二维表─般满足7个基本特征:①元组(行)个数是有限的——元组个数有限性。 ②元组(行)均不相同——元组的唯—性。 ③元组(行)的次序可以任意交换——元组的次序无关性。 ④元组(行)的分量是不可分割的基本特征——元组分量的原子性。 ⑤属性(列)名各不相…...

Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动

一、前言说明 在2011版本的gb28181协议中&#xff0c;拉取视频流只要求udp方式&#xff0c;从2016开始要求新增支持tcp被动和tcp主动两种方式&#xff0c;udp理论上会丢包的&#xff0c;所以实际使用过程可能会出现画面花屏的情况&#xff0c;而tcp肯定不丢包&#xff0c;起码…...

反向工程与模型迁移:打造未来商品详情API的可持续创新体系

在电商行业蓬勃发展的当下&#xff0c;商品详情API作为连接电商平台与开发者、商家及用户的关键纽带&#xff0c;其重要性日益凸显。传统商品详情API主要聚焦于商品基本信息&#xff08;如名称、价格、库存等&#xff09;的获取与展示&#xff0c;已难以满足市场对个性化、智能…...

【Redis技术进阶之路】「原理分析系列开篇」分析客户端和服务端网络诵信交互实现(服务端执行命令请求的过程 - 初始化服务器)

服务端执行命令请求的过程 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 初始化服务器1. 初始化服务器状态结构初始化RedisServer变量 2. 加载相关系统配置和用户配置参数定制化配置参数案…...

Golang dig框架与GraphQL的完美结合

将 Go 的 Dig 依赖注入框架与 GraphQL 结合使用&#xff0c;可以显著提升应用程序的可维护性、可测试性以及灵活性。 Dig 是一个强大的依赖注入容器&#xff0c;能够帮助开发者更好地管理复杂的依赖关系&#xff0c;而 GraphQL 则是一种用于 API 的查询语言&#xff0c;能够提…...

大语言模型如何处理长文本?常用文本分割技术详解

为什么需要文本分割? 引言:为什么需要文本分割?一、基础文本分割方法1. 按段落分割(Paragraph Splitting)2. 按句子分割(Sentence Splitting)二、高级文本分割策略3. 重叠分割(Sliding Window)4. 递归分割(Recursive Splitting)三、生产级工具推荐5. 使用LangChain的…...

面向无人机海岸带生态系统监测的语义分割基准数据集

描述&#xff1a;海岸带生态系统的监测是维护生态平衡和可持续发展的重要任务。语义分割技术在遥感影像中的应用为海岸带生态系统的精准监测提供了有效手段。然而&#xff0c;目前该领域仍面临一个挑战&#xff0c;即缺乏公开的专门面向海岸带生态系统的语义分割基准数据集。受…...

如何应对敏捷转型中的团队阻力

应对敏捷转型中的团队阻力需要明确沟通敏捷转型目的、提升团队参与感、提供充分的培训与支持、逐步推进敏捷实践、建立清晰的奖励和反馈机制。其中&#xff0c;明确沟通敏捷转型目的尤为关键&#xff0c;团队成员只有清晰理解转型背后的原因和利益&#xff0c;才能降低对变化的…...

Kafka主题运维全指南:从基础配置到故障处理

#作者&#xff1a;张桐瑞 文章目录 主题日常管理1. 修改主题分区。2. 修改主题级别参数。3. 变更副本数。4. 修改主题限速。5.主题分区迁移。6. 常见主题错误处理常见错误1&#xff1a;主题删除失败。常见错误2&#xff1a;__consumer_offsets占用太多的磁盘。 主题日常管理 …...

如何配置一个sql server使得其它用户可以通过excel odbc获取数据

要让其他用户通过 Excel 使用 ODBC 连接到 SQL Server 获取数据&#xff0c;你需要完成以下配置步骤&#xff1a; ✅ 一、在 SQL Server 端配置&#xff08;服务器设置&#xff09; 1. 启用 TCP/IP 协议 打开 “SQL Server 配置管理器”。导航到&#xff1a;SQL Server 网络配…...

医疗AI模型可解释性编程研究:基于SHAP、LIME与Anchor

1 医疗树模型与可解释人工智能基础 医疗领域的人工智能应用正迅速从理论研究转向临床实践,在这一过程中,模型可解释性已成为确保AI系统被医疗专业人员接受和信任的关键因素。基于树模型的集成算法(如RandomForest、XGBoost、LightGBM)因其卓越的预测性能和相对良好的解释性…...