当前位置: 首页 > news >正文

Spark - 继承 FileOutputFormat 实现向 HDFS 地址追加文件

目录

一.引言

二.源码浅析

1.RDD.saveAsTextFile

2.TextOutputFormat 

3.FileOutputFormat

三.源码修改

1.修改文件生成逻辑 - getRecordWriter

2.允许目录存在 - checkoutputSpecs

3.全部代码 - TextOutputFormatV2

四.追加存储代码实战

五.总结


一.引言

Output directory file XXX already exists 目标目录已经存在,这个报错写 Spark 的同学都不会陌生,它不允许我们在同一个目录持续增加文件存储。在使用 Flink 文件流场景中,我们有向 HDFS 目录追加文件的需求,所以下面我们尝试继承 FileOutputFormat 实现自定义文件追加。

二.源码浅析

自定义实现之前,我们需要明确一下追加文件的两个主要问题:

- 允许目录存在

即避免 Output directory file XXX already exists 的报错

- 避免 File 重复

由于是追加文件,这里我们要避免文件名相同导致追加失败

1.RDD.saveAsTextFile

这个是我们日常使用的文本落地 API:

这里将原始的 RDD[String] 转化为 (NullWritable.get(), text) 的 pairRDD 并调用后续的 saveAsHadoopFile 并传入 TextOutputFormat:

- NullWritable

在Hadoop1.+中是 "Comparable",因此编译器无法找到隐式为其排序,并将使用默认的 "null"。然而,它是一个 `Comparable[NullWritable]` 在 Hadoop2.+ 中,编译器将调用隐式  "Ordering.ordered” 方法来创建为 "NullWritable" 排序。这就是为什么编译器会生成不同的匿名Hadoop1.+和Hadoop2.+中“saveAsTextFile”的类。因此,在这里我们提供了一个显式排序“null”,以确保编译器生成 "saveAsTextFile" 的字节码相同。

这里翻译自官方 API,简言之,我们后续自定义 OutputFormat 时需要将 RDD[String] 转换为 PairRDD 并将 key 置为 NullWritable,否则这里无法调用 saveAsHadoopFile:

2.TextOutputFormat 

前者继承了后者,我们先看下前者复写了哪些函数:

非常简洁,大部分方法都使用父类的实现,这里 getRecordWriter 的具体实现对应我们上面提到的第二个问题即避免文件重复,因为其负责根据 name 轮训生成落地的 Path 地址,我们修改这个函数即可避免追加时文件重复。

3.FileOutputFormat

TextOutputFormat Extends FileOutputFormat,上面 TextOutputFormat 只实现了 Path 相关的工作,所以需要继续到父类 FileOutputFormat 寻找抛出异常的语句:

在文件中搜索 already exists 即可定位到当前函数,是不是很熟悉,因此针对第一个问题避免文件的报错就要修改这里了,最简单的我们把这三行注释掉即可。

三.源码修改

经过上面的源码三部曲,我们如何修改 TextOutputFormat 思路也很清晰了,修改文件生成逻辑、取消抛出异常即可,下面看一下代码实现:

1.修改文件生成逻辑 - getRecordWriter

这里我们在源码中增加了 updateFileName 函数,该函数由用户自己定义输出文件名,常规的我们可以按照 part-00000、part-00001 的顺序继续存储下去,当然如果为了区分追加文件的添加时间与类型,我们也可以给其打上时间戳和自定义标记,都不想用的话也可以直接用 UUID 代替:

import java.util.UUIDUUID.randomUUID().toString

下面看一下 updateFileName 的实现:

这里初始化变量 fileName 为 "",更新时对其加锁并基于当前 name 进行判断,如果当前 fileName 为 "",则代表是第一次保存,因此默认使用 name 的 part-00000,后续再多次存储是,我们就可以获取 fileName 的后缀进行累加输出了,这里使用 DecimalFormat 实现了自动补 0 的操作。

  static String fileName = "";DecimalFormat decimalFormat = new DecimalFormat("00000");public void updateFileName(String name) {synchronized (fileName) {if (fileName.equals("")) {fileName = name;} else {fileName = "part-" + decimalFormat.format(Integer.parseInt(fileName.split("-")[1]) + 1);}}}

2.允许目录存在 - checkoutputSpecs

把 throw Exception 的异常去掉就好了,这里保留了 Println 提示目录已经存在并开始追加。

3.全部代码 - TextOutputFormatV2

换个 TextOutputFormatV2 实现我们追加文件存储的目的。

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.text.DecimalFormat;import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.util.*;/*** An {@link OutputFormat} that writes plain text files.*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TextOutputFormatV2<K, V> extends FileOutputFormat<K, V> {static String fileName = "";DecimalFormat decimalFormat = new DecimalFormat("00000");public void updateFileName(String name) {synchronized (fileName) {if (fileName.equals("")) {fileName = name;} else {fileName = "part-" + decimalFormat.format(Integer.parseInt(fileName.split("-")[1]) + 1);}}}protected static class LineRecordWriter<K, V>implements RecordWriter<K, V> {private static final String utf8 = "UTF-8";private static final byte[] newline;static {try {newline = "\n".getBytes(utf8);} catch (UnsupportedEncodingException uee) {throw new IllegalArgumentException("can't find " + utf8 + " encoding");}}protected DataOutputStream out;private final byte[] keyValueSeparator;public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {this.out = out;try {this.keyValueSeparator = keyValueSeparator.getBytes(utf8);} catch (UnsupportedEncodingException uee) {throw new IllegalArgumentException("can't find " + utf8 + " encoding");}}public LineRecordWriter(DataOutputStream out) {this(out, "\t");}/*** Write the object to the byte stream, handling Text as a special* case.* @param o the object to print* @throws IOException if the write throws, we pass it on*/private void writeObject(Object o) throws IOException {if (o instanceof Text) {Text to = (Text) o;out.write(to.getBytes(), 0, to.getLength());} else {out.write(o.toString().getBytes(utf8));}}public synchronized void write(K key, V value)throws IOException {boolean nullKey = key == null || key instanceof NullWritable;boolean nullValue = value == null || value instanceof NullWritable;if (nullKey && nullValue) {return;}if (!nullKey) {writeObject(key);}if (!(nullKey || nullValue)) {out.write(keyValueSeparator);}if (!nullValue) {writeObject(value);}out.write(newline);}public synchronized void close(Reporter reporter) throws IOException {out.close();}}@Overridepublic void checkOutputSpecs(FileSystem ignored, JobConf job)throws FileAlreadyExistsException,InvalidJobConfException, IOException {// Ensure that the output directory is set and not already therePath outDir = getOutputPath(job);if (outDir == null && job.getNumReduceTasks() != 0) {throw new InvalidJobConfException("Output directory not set in JobConf.");}if (outDir != null) {FileSystem fs = outDir.getFileSystem(job);// normalize the output directoryoutDir = fs.makeQualified(outDir);setOutputPath(job, outDir);// get delegation token for the outDir's file systemTokenCache.obtainTokensForNamenodes(job.getCredentials(),new Path[] {outDir}, job);// check its existenceif (fs.exists(outDir)) {System.out.println("Output directory " + outDir + " already exists, Start Append!");}}}public RecordWriter<K, V> getRecordWriter(FileSystem ignored,JobConf job,String name,Progressable progress)throws IOException {boolean isCompressed = getCompressOutput(job);String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator","\t");updateFileName(name);if (!isCompressed) {Path file = FileOutputFormat.getTaskOutputPath(job, fileName);FileSystem fs = file.getFileSystem(job);FSDataOutputStream fileOut = fs.create(file, progress);return new com.CommonTool.TextOutputFormatV2.LineRecordWriter<K, V>(fileOut, keyValueSeparator);} else {Class<? extends CompressionCodec> codecClass =getOutputCompressorClass(job, GzipCodec.class);// create the named codecCompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);// build the filename including the extensionPath file =FileOutputFormat.getTaskOutputPath(job,fileName + codec.getDefaultExtension());FileSystem fs = file.getFileSystem(job);FSDataOutputStream fileOut = fs.create(file, progress);return new com.CommonTool.TextOutputFormatV2.LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator);}}
}

四.追加存储代码实战

  def main(args: Array[String]): Unit = {val (argsList, argsMap) = ArgsParseUtil.parseArgs(args)val conf = (new SparkConf).setAppName("AppendFileToHdfs").setMaster("local[*]")val spark = SparkSession.builder.config(conf).getOrCreate()val sc = spark.sparkContextval output = argsMap.getOrElse("output", "./append_output")val data = sc.parallelize(0 to 1000).mapPartitions { iter =>val text = new Text()iter.map { x =>text.set(x.toString)(NullWritable.get(), text)}}(0 until 10).foreach(epoch => {// 存储一次无法继续存储// data.saveAsTextFile(output)data.saveAsHadoopFile(output, classOf[NullWritable], classOf[Text], classOf[TextOutputFormatV2[NullWritable, String]])})}

首先转化为 (NullWritable.get(), text) 的 pairRDD,随后调用 saveAsHadoopFile 方法并传入我们自定义的 TextOutputFormatV2 即可,由于我们 for 循环了 10 次,所以打印了 10 个相关日志。

下面再看下生成的文件:

每个文件存 250 个数字,每次存储 4 个 part,10 次 40 个且保持递增顺序。

五.总结

想要结合源码进行修改时,结合自己的需求,带着问题去找对应的函数再复写就 OK 了,由于这里逻辑比较简单所以我们也没踩太多坑。上面采用的是单次 Job 连续存储,所以 Format 里 FileName 能够达到累加的情况,如果是多个 Job 重复启动,则每次获取的都是 part-00000,这时如果还想要保持文件名递增的话可以使用 FileSystem.listStatus 遍历文件夹获取 modifyTime 最新的文件并取其 name 即可拿到最新的文件名 part,此时将参数传入 TextFormat 并修改 update 逻辑即可实现多 Job 重复启动且文件名递增的需求了。当然了,使用 UUID + Date 是最省事滴。

相关文章:

Spark - 继承 FileOutputFormat 实现向 HDFS 地址追加文件

目录 一.引言 二.源码浅析 1.RDD.saveAsTextFile 2.TextOutputFormat 3.FileOutputFormat 三.源码修改 1.修改文件生成逻辑 - getRecordWriter 2.允许目录存在 - checkoutputSpecs 3.全部代码 - TextOutputFormatV2 四.追加存储代码实战 五.总结 一.引言 Output d…...

树莓派编程控制继电器及继电器组

目录 一&#xff0c;继电器说明 ● 继电器接口说明 ① 继电器输入端: ② 继电器输出端: 二&#xff0c;树莓派控制继电器 三&#xff0c;树莓派控制继电器组 一&#xff0c;继电器说明 通俗点讲&#xff0c;可以把继电器理解成是一些功能设备的控制开关。 ● LOW&#…...

oracle和mysql的区别

Oracle与MySQL的区别以及优缺点 MySQL的特点 1、性能卓越&#xff0c;服务稳定&#xff0c;很少出现异常宕机&#xff1b; 2、开放源代码无版本制约&#xff0c;自主性及使用成本低&#xff1b; 3、历史悠久&#xff0c;社区和用户非常活跃&#xff0c;遇到问题及时寻求帮助…...

<Linux开发> linux应用开发-之-uart通信开发例程

一、简介 串口全称叫做串行接口&#xff0c;串行接口指的是数据一个一个的按顺序传输&#xff0c;通信线路简单。使用两条线即可. 实现双向通信&#xff0c;一条用于发送&#xff0c;一条用于接收。串口通信距离远&#xff0c;但是速度相对会低&#xff0c;串口是一种很常用的工…...

基于深度学习的安全帽检测系统(YOLOv5清新界面版,Python代码)

摘要&#xff1a;安全帽检测系统用于自动化监测安全帽佩戴情况&#xff0c;在需要佩戴安全帽的场合自动安全提醒&#xff0c;实现图片、视频和摄像头等多种形式监测。在介绍算法原理的同时&#xff0c;给出Python的实现代码、训练数据集&#xff0c;以及PyQt的UI界面。安全帽检…...

Linux - 进程控制(进程替换)

0.引入创建子进程的目的是什么&#xff1f;就是为了让子进程帮我执行特定的任务让子进程执行父进程的一部分代码如果子进程想执行一个全新的程序代码呢&#xff1f; 那么就要使用进程的程序替换为什么要有程序替换&#xff1f;也就是说子进程想执行一个全新的程序代码&#xff…...

Java中 ==和equals的区别是什么?

作用&#xff1a; 基本类型&#xff0c;比较值是否相等引用类型&#xff0c;比较内存地址值是否相等不能比较没有父子关系的两个对象equals()方法的作用&#xff1a; JDK 中的类一般已经重写了 equals()&#xff0c;比较的是内容自定义类如果没有重写 equals()&#xff0c;将…...

Linux(网络基础---网络层)

文章目录0. 前言1. IP协议1-1 基本概念1-2 协议头格式2. 网段划分2-1 基本概念2.2 IP地址分五大类2-3 特殊的IP地址2-4 IP地址的数量限制2-5 私有IP地址和公网IP地址2-6 路由0. 前言 前面我们讲了&#xff0c;应用层、传输层&#xff1b;本章讲网络层。 应用层&#xff1a;我…...

空间信息智能应用团队研究成果介绍及人才引进

目录1、多平台移动测量技术1.1 车载移动测量系统1.2 机载移动测量系统2、数据处理与应用技术研究2.1 点云与影像融合2.2 点云配准与拼接2.3 点云滤波与分类2.4 道路矢量地图提取2.5 道路三维自动建模2.6 道路路面三维病害分析2.7 多期点云三维变形分析2.8 地表覆盖遥感监测分析…...

ChatGPT应用场景与工具推荐

目录 写在前面 一、关于ChatGPT 二、应用实例 1.写文章 2.入门新的知识 3.解决疑难问题 4.生成预演问题 5.文本改写 6.语言翻译 7.思维导图 8.PDF阅读理解 9.操作格式化的数据 10.模拟场景 11.写代码 三、现存局限 写在前面 本文会简单介绍ChatGPT的特点、局限以…...

图像分类卷积神经网络模型综述

图像分类卷积神经网络模型综述遇到问题 图像分类&#xff1a;核心任务是从给定的分类集合中给图像分配一个标签任务。 输入&#xff1a;图片 输出&#xff1a;类别。 数据集MNIST数据集 MNIST数据集是用来识别手写数字&#xff0c;由0~9共10类别组成。 从MNIST数据集的SD-1和…...

艹,终于在8226上把灯点亮了

接上次点文章ESP8266还可以这样玩这次&#xff0c;我终于学会了在ESP8266上面点亮LED灯了现在一个单片机的价格是几块&#xff0c;加上一个晶振&#xff0c;再来一个快递费&#xff0c;十几块钱还是需要的。所以能用这个ESP8266来当单片机玩&#xff0c;还是比较不错的可以在ub…...

脱不下孔乙己的长衫,现代的年轻人该怎么办?

“如果我没读过书&#xff0c;我还可以做别的工作&#xff0c;可我偏偏读过书” “学历本该是我的敲门砖&#xff0c;却成了我脱不下的长衫。” 最近&#xff0c;“脱下孔乙己的长衫”在网上火了。在鲁迅的原著小说中&#xff0c;孔乙己属于知识阶级&#xff08;长衫客&#xf…...

Matlab实现遗传算法

遗传算法&#xff08;Genetic Algorithm&#xff0c;GA&#xff09;是一种基于生物进化理论的优化算法&#xff0c;通过模拟自然界中的遗传过程&#xff0c;来寻找最优解。 在遗传算法中&#xff0c;每个解被称为个体&#xff0c;每个个体由一组基因表示&#xff0c;每个基因是…...

评价公式-均方误差

均方误差的公式可以通过以下步骤推导得出&#xff1a; 假设有n个样本&#xff0c;真实值分别为y₁, y₂, ……, yₙ&#xff0c;预测值分别为ŷ₁, ŷ₂, ……, ŷₙ。 首先&#xff0c;我们可以定义误差&#xff08;error&#xff09;为预测值与真实值之间的差&#xff1a; …...

冲击蓝桥杯-时间问题(必考)

目录 前言&#xff1a; 一、时间问题 二、使用步骤 1、考察小时&#xff0c;分以及秒的使用、 2、判断日期是否合法 3、遍历日期 4、推算星期几 总结 前言&#xff1a; 时间问题可以说是蓝桥杯&#xff0c;最喜欢考的问题了,因为时间问题不涉及到算法和一些复杂的知识&#xf…...

10个杀手级应用的Python自动化脚本

10个杀手级应用的Python自动化脚本 重复的任务总是耗费时间和枯燥的。想象一下&#xff0c;逐一裁剪100张照片&#xff0c;或者做诸如Fetching APIs、纠正拼写和语法等任务&#xff0c;所有这些都需要大量的时间。为什么不把它们自动化呢&#xff1f;在今天的文章中&#xff0c…...

2023​史上最全软件测试工程师常见的面试题总结​ 备战金三银四

在这里我给大家推荐一套专门讲解软件测试简历&#xff0c;和面试题的视频&#xff0c;实测有效&#xff0c;建议大家可以看看&#xff01; 春招必看已上岸&#xff0c;软件测试常问面试题【全网最详细&#xff0c;让你不再踩坑】_哔哩哔哩_bilibili春招必看已上岸&#xff0c;…...

2023年全国最新安全员精选真题及答案29

百分百题库提供安全员考试试题、建筑安全员考试预测题、建筑安全员ABC考试真题、安全员证考试题库等&#xff0c;提供在线做题刷题&#xff0c;在线模拟考试&#xff0c;助你考试轻松过关。 81.&#xff08;单选题&#xff09;同一建筑施工企业在12个月内连续发生&#xff08;&…...

关系数据库的7个基本特征

文章目录关系数据库中的二维表─般满足7个基本特征:①元组(行)个数是有限的——元组个数有限性。 ②元组(行)均不相同——元组的唯—性。 ③元组(行)的次序可以任意交换——元组的次序无关性。 ④元组(行)的分量是不可分割的基本特征——元组分量的原子性。 ⑤属性(列)名各不相…...

OpenLayers 可视化之热力图

注&#xff1a;当前使用的是 ol 5.3.0 版本&#xff0c;天地图使用的key请到天地图官网申请&#xff0c;并替换为自己的key 热力图&#xff08;Heatmap&#xff09;又叫热点图&#xff0c;是一种通过特殊高亮显示事物密度分布、变化趋势的数据可视化技术。采用颜色的深浅来显示…...

【Python】 -- 趣味代码 - 小恐龙游戏

文章目录 文章目录 00 小恐龙游戏程序设计框架代码结构和功能游戏流程总结01 小恐龙游戏程序设计02 百度网盘地址00 小恐龙游戏程序设计框架 这段代码是一个基于 Pygame 的简易跑酷游戏的完整实现,玩家控制一个角色(龙)躲避障碍物(仙人掌和乌鸦)。以下是代码的详细介绍:…...

《从零掌握MIPI CSI-2: 协议精解与FPGA摄像头开发实战》-- CSI-2 协议详细解析 (一)

CSI-2 协议详细解析 (一&#xff09; 1. CSI-2层定义&#xff08;CSI-2 Layer Definitions&#xff09; 分层结构 &#xff1a;CSI-2协议分为6层&#xff1a; 物理层&#xff08;PHY Layer&#xff09; &#xff1a; 定义电气特性、时钟机制和传输介质&#xff08;导线&#…...

unix/linux,sudo,其发展历程详细时间线、由来、历史背景

sudo 的诞生和演化,本身就是一部 Unix/Linux 系统管理哲学变迁的微缩史。来,让我们拨开时间的迷雾,一同探寻 sudo 那波澜壮阔(也颇为实用主义)的发展历程。 历史背景:su的时代与困境 ( 20 世纪 70 年代 - 80 年代初) 在 sudo 出现之前,Unix 系统管理员和需要特权操作的…...

AI编程--插件对比分析:CodeRider、GitHub Copilot及其他

AI编程插件对比分析&#xff1a;CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展&#xff0c;AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者&#xff0c;分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...

根据万维钢·精英日课6的内容,使用AI(2025)可以参考以下方法:

根据万维钢精英日课6的内容&#xff0c;使用AI&#xff08;2025&#xff09;可以参考以下方法&#xff1a; 四个洞见 模型已经比人聪明&#xff1a;以ChatGPT o3为代表的AI非常强大&#xff0c;能运用高级理论解释道理、引用最新学术论文&#xff0c;生成对顶尖科学家都有用的…...

select、poll、epoll 与 Reactor 模式

在高并发网络编程领域&#xff0c;高效处理大量连接和 I/O 事件是系统性能的关键。select、poll、epoll 作为 I/O 多路复用技术的代表&#xff0c;以及基于它们实现的 Reactor 模式&#xff0c;为开发者提供了强大的工具。本文将深入探讨这些技术的底层原理、优缺点。​ 一、I…...

网络编程(UDP编程)

思维导图 UDP基础编程&#xff08;单播&#xff09; 1.流程图 服务器&#xff1a;短信的接收方 创建套接字 (socket)-----------------------------------------》有手机指定网络信息-----------------------------------------------》有号码绑定套接字 (bind)--------------…...

怎么让Comfyui导出的图像不包含工作流信息,

为了数据安全&#xff0c;让Comfyui导出的图像不包含工作流信息&#xff0c;导出的图像就不会拖到comfyui中加载出来工作流。 ComfyUI的目录下node.py 直接移除 pnginfo&#xff08;推荐&#xff09;​​ 在 save_images 方法中&#xff0c;​​删除或注释掉所有与 metadata …...

STM32---外部32.768K晶振(LSE)无法起振问题

晶振是否起振主要就检查两个1、晶振与MCU是否兼容&#xff1b;2、晶振的负载电容是否匹配 目录 一、判断晶振与MCU是否兼容 二、判断负载电容是否匹配 1. 晶振负载电容&#xff08;CL&#xff09;与匹配电容&#xff08;CL1、CL2&#xff09;的关系 2. 如何选择 CL1 和 CL…...