当前位置: 首页 > news >正文

SpringBoot操作spark处理hdfs文件

SpringBoot操作spark处理hdfs文件

  • 在这里插入图片描述

1、导入依赖

  • <!--        spark依赖--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.2.2</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.2.2</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-mllib --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.12</artifactId><version>3.2.2</version></dependency>
    

2、配置spark信息

  • 建立一个配置文件,配置spark信息
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;//将文件交于spring管理
@Configuration
public class SparkConfig {//使用yml中的配置@Value("${spark.master}")private String sparkMaster;@Value("${spark.appName}")private String sparkAppName;@Value("${hdfs.user}")private String hdfsUser;@Value("${hdfs.path}")private String hdfsPath;@Beanpublic SparkConf sparkConf() {SparkConf conf = new SparkConf();conf.setMaster(sparkMaster);conf.setAppName(sparkAppName);// 添加HDFS配置conf.set("fs.defaultFS", hdfsPath);conf.set("spark.hadoop.hdfs.user",hdfsUser);return conf;}@Beanpublic SparkSession sparkSession() {return SparkSession.builder().config(sparkConf()).getOrCreate();}
}

3、controller和service

  • controller类

    • import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.web.bind.annotation.GetMapping;
      import org.springframework.web.bind.annotation.RequestMapping;
      import org.springframework.web.bind.annotation.RestController;
      import xyz.zzj.traffic_main_code.service.SparkService;@RestController
      @RequestMapping("/spark")
      public class SparkController {@Autowiredprivate SparkService sparkService;@GetMapping("/run")public String runSparkJob() {//读取Hadoop HDFS文件String filePath = "hdfs://192.168.44.128:9000/subwayData.csv";sparkService.executeHadoopSparkJob(filePath);return "Spark job executed successfully!";}
      }
      
  • 处理地铁数据的service

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import xyz.zzj.traffic_main_code.service.SparkReadHdfs;import java.io.IOException;
import java.net.URI;
import static org.apache.spark.sql.functions.*;@Service
public class SparkReadHdfsImpl implements SparkReadHdfs {private final SparkSession spark;@Value("${hdfs.user}")private String hdfsUser;@Value("${hdfs.path}")private String hdfsPath;@Autowiredpublic SparkReadHdfsImpl(SparkSession spark) {this.spark = spark;}/*** 读取HDFS上的CSV文件并上传到HDFS* @param filePath*/@Overridepublic void sparkSubway(String filePath) {try {// 设置Hadoop配置JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());Configuration hadoopConf = jsc.hadoopConfiguration();hadoopConf.set("fs.defaultFS", hdfsPath);hadoopConf.set("hadoop.user.name", hdfsUser);// 读取HDFS上的文件Dataset<Row> df = spark.read().option("header", "true") // 指定第一行是列名.option("inferSchema", "true") // 自动推断列的数据类型.csv(filePath);// 显示DataFrame的所有数据
//            df.show(Integer.MAX_VALUE, false);// 对DataFrame进行清洗和转换操作// 检查缺失值df.select("number", "people", "dateTime").na().drop().show();// 对数据进行类型转换Dataset<Row> df2 = df.select(col("number").cast(DataTypes.IntegerType),col("people").cast(DataTypes.IntegerType),to_date(col("dateTime"), "yyyy年MM月dd日").alias("dateTime"));// 去重Dataset<Row> df3 = df2.dropDuplicates();// 数据过滤,确保people列没有负数Dataset<Row> df4 = df3.filter(col("people").geq(0));
//            df4.show();// 数据聚合,按dateTime分组,统计每天的总客流量Dataset<Row> df6 = df4.groupBy("dateTime").agg(sum("people").alias("total_people"));
//            df6.show();sparkForSubway(df6,"/time_subwayData.csv");//数据聚合,获取每天人数最多的地铁numberDataset<Row> df7 = df4.groupBy("dateTime").agg(max("people").alias("max_people"));sparkForSubway(df7,"/everyday_max_subwayData.csv");//数据聚合,计算每天的客流强度:每天总people除以632840Dataset<Row> df8 = df4.groupBy("dateTime").agg(sum("people").divide(632.84).alias("strength"));sparkForSubway(df8,"/everyday_strength_subwayData.csv");} catch (Exception e) {e.printStackTrace();}}private static void sparkForSubway(Dataset<Row> df6, String hdfsPath) throws IOException {// 保存处理后的数据到HDFSdf6.coalesce(1).write().mode("overwrite").option("header", "true").csv("hdfs://192.168.44.128:9000/time_subwayData");// 创建Hadoop配置Configuration conf = new Configuration();// 获取FileSystem实例FileSystem fs = FileSystem.get(URI.create("hdfs://192.168.44.128:9000"), conf);// 定义临时目录和目标文件路径Path tempDir = new Path("/time_subwayData");FileStatus[] files = fs.listStatus(tempDir);// 检查目标文件是否存在,如果存在则删除Path targetFile1 = new Path(hdfsPath);if (fs.exists(targetFile1)) {fs.delete(targetFile1, true); // true 表示递归删除}for (FileStatus file : files) {if (file.isFile() && file.getPath().getName().startsWith("part-")) {Path targetFile = new Path(hdfsPath);fs.rename(file.getPath(), targetFile);}}// 删除临时目录fs.delete(tempDir, true);}}

4、运行

  • 项目运行完后,打开浏览器
    • spark处理地铁数据
      • http://localhost:8686/spark/dispose
  • 观察spark和hdfs
    • http://192.168.44.128:8099/
    • http://192.168.44.128:9870/explorer.html#/
      • image-20250109095551610

相关文章:

SpringBoot操作spark处理hdfs文件

SpringBoot操作spark处理hdfs文件 1、导入依赖 <!-- spark依赖--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.2.2</version></dependency><depend…...

消息队列架构、选型、专有名词解释

私人博客传送门 消息队列专有名词解释 | 魔筝炼药师 MQ选型 | 魔筝炼药师 MQ架构 | 魔筝炼药师 MQ顺序消息 | 魔筝炼药师...

用OpenCV实现UVC视频分屏

分屏 OpencvUVC代码验证后话 用OpenCV实现UVC摄像头的视频分屏。 Opencv opencv里有很多视频图像的处理功能。 UVC Usb 视频类&#xff0c;免驱动的。视频流格式有MJPG和YUY2。MJPG是RGB三色通道的。要对三通道进行分屏显示。 代码 import cv2 import numpy as np video …...

Allure 集成 pytest

Allure 是一个强大的测试报告工具&#xff0c;与 pytest 集成可以生成详细的测试报告&#xff0c;包括测试步骤、测试数据、截图、错误堆栈等。 1. 安装 Allure 和相关依赖 安装 pytest-allure-adaptor 插件&#xff1a; pip install allure-pytest确保本地已安装 Allure 工具。…...

【Python】构建智能语音助手:使用Python实现语音识别与合成的全面指南

随着人工智能技术的迅猛发展&#xff0c;语音助手已成为人们日常生活中不可或缺的一部分。从智能手机到智能家居设备&#xff0c;语音交互提供了便捷高效的人机交互方式。本文旨在全面介绍如何利用Python编程语言及其强大的库——SpeechRecognition和gTTS&#xff0c;构建一个基…...

在 Arthas 中调用 Spring Bean 方法

获取 Spring 应用上下文 使用工具类 如果你的项目中有一个工具类实现了 ApplicationContextAware 接口&#xff0c;如 cn.shutdown.pf.utils.SpringContextUtils&#xff0c;可以使用该类获取 ApplicationContext&#xff1a; Component public final class SpringContextUt…...

Nginx入门笔记

Nginx入门笔记 一、Nginx基本概念二、代理1、正向代理2、反向代理 三、准备工作1、CentOS 7安装nginx&#xff08;1&#xff09;. 安装必要的依赖&#xff08;2&#xff09;下载nginx&#xff08;3&#xff09;编译安装&#xff08;4&#xff09;编译并安装 Nginx(5)启动nginx …...

【单片机】实现一个简单的ADC滤波器

实现一个 ADC的滤波器&#xff0c;PT1 滤波器&#xff08;也称为一阶低通滤波器&#xff09;&#xff0c;用于对输入信号进行滤波处理。 typedef struct PT1FilterSettings PT1FilterSettings; struct PT1FilterSettings {//! last Filter output valueuint32_t filtValOld;//…...

开源 vGPU 方案 HAMi 解析

开源 vGPU 方案 HAMi 一、k8s 环境下 GPU 资源管理的现状与问题 &#xff08;一&#xff09;资源感知与绑定 在 k8s 中&#xff0c;资源与节点紧密绑定。对于 GPU 资源&#xff0c;我们依赖 NVIDIA 提供的 device-plugin 来进行感知&#xff0c;并将其上报到 kube-apiserver…...

备考蓝桥杯:顺序表详解(静态顺序表,vector用法)

目录 1.顺序表的概念 2.静态顺序表的实现 总代码 3.stl库动态顺序表vector 测试代码 1.顺序表的概念 要理解顺序表&#xff0c;我们要先了解一下什么是线性表 线性表是n个具有相同特征的数据元素的序列 这就是一个线性表 a1是表头 a4是表尾 a2是a3的前驱 a3是a2的后继 空…...

OA系统如何做好DDOS防护

OA系统如何做好DDOS防护&#xff1f;在数字化办公蔚然成风的当下&#xff0c;OA&#xff08;办公自动化&#xff09;系统作为企业内部管理与协作的神经中枢&#xff0c;其安全性和稳定性直接关系到企业的日常运营效率、信息流通效率以及长远发展。OA系统不仅承载着企业内部的日…...

使用 Python 的 pyttsx3 库进行文本转语音

1. 什么是 pyttsx3&#xff1f; 1.1 pyttsx3 是一个 Python 库&#xff0c;它可以将文本转换为语音。与其他文本转语音库&#xff08;如 gTTS&#xff09;不同&#xff0c;pyttsx3 不依赖于网络服务&#xff0c;它使用本地的 TTS&#xff08;Text-to-Speech&#xff09;引擎&a…...

如何在Windows上编译OpenCV4.7.0

前言 ​ 参考&#xff1a;Win10 下编译 OpenCV 4.7.0详细全过程&#xff0c;包含xfeatures2d 这里在其基础上还出现了一些问题&#xff0c;仅供参考。 正文 一、环境 1、win10 2、cmake-gui 3、opencv4.7.0 4、VS2019 二、编译过程 1、下载需要的文件&#xff1a; 通…...

【玩转全栈】----Django连接MySQL

阅前先赞&#xff0c;养好习惯&#xff01; 目录 1、ORM框架介绍 选择建议 2、安装mysqlclient 3、创建数据库 4、修改settings&#xff0c;连接数据库 5、对数据库进行操作 创建表 删除表 添加数据 删除数据 修改&#xff08;更新&#xff09;数据&#xff1a; 获取数据 1、OR…...

25/1/4 算法笔记<强化学习> 生成对抗模仿学习

基于生成对抗网络的模仿学习&#xff0c;假设存在一个专家智能体&#xff0c;其策略可以看成最优策略&#xff0c;我们就可以通过直接模仿这个专家在环境中交互的动作数据来训练一个策略&#xff0c;并不需要用到环境提供的奖励信息。 生成对抗模仿学习GAIL实质上就是模仿了专家…...

Flink维表方案选型

Iceberg Iceberg 采用全量预加载数据的方式将维度表数据全部加载到内存中进行关联&#xff0c;虽然可以避免频繁访问外部数据库&#xff0c;但对计算节点的内存消耗很高&#xff0c;不能适用于数量很大的维度表。除此之外&#xff0c;当 Iceberg 维表数据更新后&#xff0c;可…...

Oracle Database 23ai 新特性: UPDATE 和 DELETE 语句的直接联接

Oracle Database 23c 引入了一系列令人振奋的新特性&#xff0c;其中一项尤为引人注目的是对 UPDATE 和 DELETE 语句支持直接联接&#xff08;Direct Join&#xff09;。这一新功能极大地简化了复杂数据操作的实现&#xff0c;提升了性能&#xff0c;并为数据库开发者提供了更强…...

机器学习之随机森林算法实现和特征重要性排名可视化

随机森林算法实现和特征重要性排名可视化 目录 随机森林算法实现和特征重要性排名可视化1 随机森林算法1.1 概念1.2 主要特点1.3 优缺点1.4 步骤1.5 函数及参数1.5.1 函数导入1.5.2 参数 1.6 特征重要性排名 2 实际代码测试 1 随机森林算法 1.1 概念 是一种基于树模型的集成学…...

网络安全图谱以及溯源算法

​ 本文提出了一种网络攻击溯源框架&#xff0c;以及一种网络安全知识图谱&#xff0c;该图由六个部分组成&#xff0c;G <H&#xff0c;V&#xff0c;A&#xff0c;E&#xff0c;L&#xff0c;S&#xff0c;R>。 1|11.知识图 ​ 网络知识图由六个部分组成&#xff0c…...

单片机-外部中断

中断是指 CPU 在处理某一事件 A 时&#xff0c;发生了另一事件 B&#xff0c;请求 CPU 迅速去处理(中断发生)&#xff1b;CPU 暂时停止当前的工作(中断响应)&#xff0c; 转去处理事件 B(中断服务)&#xff1b;待 CPU 将事件 B 处理完毕后&#xff0c;再回到原来事件 A 被中断的…...

Python_day48随机函数与广播机制

在继续讲解模块消融前&#xff0c;先补充几个之前没提的基础概念 尤其需要搞懂张量的维度、以及计算后的维度&#xff0c;这对于你未来理解复杂的网络至关重要 一、 随机张量的生成 在深度学习中经常需要随机生成一些张量&#xff0c;比如权重的初始化&#xff0c;或者计算输入…...

C#学习12——预处理

一、预处理指令&#xff1a; 解释&#xff1a;是在编译前由预处理器执行的命令&#xff0c;用于控制编译过程。这些命令以 # 开头&#xff0c;每行只能有一个预处理指令&#xff0c;且不能包含在方法或类中。 个人理解&#xff1a;就是游戏里面的备战阶段&#xff08;不同对局…...

Linux——TCP和UDP

一、TCP协议 1.特点 TCP提供的是面向连接、可靠的、字节流服务。 2.编程流程 &#xff08;1&#xff09;服务器端的编程流程 ①socket() 方法创建套接字 ②bind()方法指定套接字使用的IP地址和端口。 ③listen()方法用来创建监听队列。 ④accept()方法处理客户端的连接…...

如何用 HTML 展示计算机代码

原文&#xff1a;如何用 HTML 展示计算机代码 | w3cschool笔记 &#xff08;请勿将文章标记为付费&#xff01;&#xff01;&#xff01;&#xff01;&#xff09; 在编程学习和文档编写过程中&#xff0c;清晰地展示代码是一项关键技能。HTML 作为网页开发的基础语言&#x…...

SpringBoot3中使用虚拟线程的详细过程

在 Spring Boot 3 中使用 Java 21 的虚拟线程&#xff08;Virtual Threads&#xff09;可以显著提升 I/O 密集型应用的并发能力。以下是详细实现步骤&#xff1a; 1. 环境准备 JDK 21&#xff1a;确保安装 JDK 21 或更高版本Spring Boot 3.2&#xff1a;最低要求&#xff08;p…...

FPGA点亮ILI9488驱动的SPI+RGB接口LCD显示屏(一)

FPGA点亮ILI9488驱动的SPIRGB接口LCD显示屏 ILI9488 RGB接口初始化 目录 前言 一、ILI9488简介 二、3线SPI接口简介 三、配置寄存器介绍 四、手册和初始化verilog FPGA代码 总结 前言 ILI9488是一款广泛应用于嵌入式系统和电子设备的彩色TFT LCD显示控制器芯片。本文将介…...

用电脑通过USB总线连接控制keysight示波器

通过USB总线控制示波器的优势 在上篇文章我介绍了如何通过网线远程连接keysight示波器&#xff0c;如果连接的距离不是很远&#xff0c;也可以通过USB线将示波器与电脑连接起来&#xff0c;实现对示波器的控制和截图。 在KEYSIGHT示波器DSOX1204A的后端&#xff0c;除了有网口…...

【评测】Qwen3-Embedding模型初体验

每一篇文章前后都增加返回目录 回到目录 【评测】Qwen3-Embedding模型初体验 模型的介绍页面 本机配置&#xff1a;八代i5-8265U&#xff0c;16G内存&#xff0c;无GPU核显运行&#xff0c;win10操作系统 ollama可以通过下面命令拉取模型&#xff1a; ollama pull modelscope…...

免费批量PDF转Word工具

免费批量PDF转Word工具 工具简介 这是一款简单易用的批量PDF转Word工具&#xff0c;支持&#xff1a; 批量转换多个PDF文件保留原始格式和布局快速高效的转换速度完全免费使用 工具地址 下载链接 网盘下载地址&#xff1a;点击下载 提取码&#xff1a;8888 功能特点 ✅…...

C语言 | C代码编写中的易错点总结

C语言易错点 **1. 指针与内存管理****2. 数组与字符串****3. 未初始化变量****4. 类型转换与溢出****5. 运算符优先级****6. 函数与参数传递****7. 宏定义陷阱****8. 结构体与内存对齐****9. 输入/输出函数****10. 其他常见问题****最佳实践**在C语言编程中,由于其底层特性和灵…...