SpringBoot操作spark处理hdfs文件
SpringBoot操作spark处理hdfs文件
1、导入依赖
-
<!-- spark依赖--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.2.2</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.2.2</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-mllib --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.12</artifactId><version>3.2.2</version></dependency>
2、配置spark信息
- 建立一个配置文件,配置spark信息
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;//将文件交于spring管理
@Configuration
public class SparkConfig {//使用yml中的配置@Value("${spark.master}")private String sparkMaster;@Value("${spark.appName}")private String sparkAppName;@Value("${hdfs.user}")private String hdfsUser;@Value("${hdfs.path}")private String hdfsPath;@Beanpublic SparkConf sparkConf() {SparkConf conf = new SparkConf();conf.setMaster(sparkMaster);conf.setAppName(sparkAppName);// 添加HDFS配置conf.set("fs.defaultFS", hdfsPath);conf.set("spark.hadoop.hdfs.user",hdfsUser);return conf;}@Beanpublic SparkSession sparkSession() {return SparkSession.builder().config(sparkConf()).getOrCreate();}
}
3、controller和service
-
controller类
-
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import xyz.zzj.traffic_main_code.service.SparkService;@RestController @RequestMapping("/spark") public class SparkController {@Autowiredprivate SparkService sparkService;@GetMapping("/run")public String runSparkJob() {//读取Hadoop HDFS文件String filePath = "hdfs://192.168.44.128:9000/subwayData.csv";sparkService.executeHadoopSparkJob(filePath);return "Spark job executed successfully!";} }
-
-
处理地铁数据的service
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import xyz.zzj.traffic_main_code.service.SparkReadHdfs;import java.io.IOException;
import java.net.URI;
import static org.apache.spark.sql.functions.*;@Service
public class SparkReadHdfsImpl implements SparkReadHdfs {private final SparkSession spark;@Value("${hdfs.user}")private String hdfsUser;@Value("${hdfs.path}")private String hdfsPath;@Autowiredpublic SparkReadHdfsImpl(SparkSession spark) {this.spark = spark;}/*** 读取HDFS上的CSV文件并上传到HDFS* @param filePath*/@Overridepublic void sparkSubway(String filePath) {try {// 设置Hadoop配置JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());Configuration hadoopConf = jsc.hadoopConfiguration();hadoopConf.set("fs.defaultFS", hdfsPath);hadoopConf.set("hadoop.user.name", hdfsUser);// 读取HDFS上的文件Dataset<Row> df = spark.read().option("header", "true") // 指定第一行是列名.option("inferSchema", "true") // 自动推断列的数据类型.csv(filePath);// 显示DataFrame的所有数据
// df.show(Integer.MAX_VALUE, false);// 对DataFrame进行清洗和转换操作// 检查缺失值df.select("number", "people", "dateTime").na().drop().show();// 对数据进行类型转换Dataset<Row> df2 = df.select(col("number").cast(DataTypes.IntegerType),col("people").cast(DataTypes.IntegerType),to_date(col("dateTime"), "yyyy年MM月dd日").alias("dateTime"));// 去重Dataset<Row> df3 = df2.dropDuplicates();// 数据过滤,确保people列没有负数Dataset<Row> df4 = df3.filter(col("people").geq(0));
// df4.show();// 数据聚合,按dateTime分组,统计每天的总客流量Dataset<Row> df6 = df4.groupBy("dateTime").agg(sum("people").alias("total_people"));
// df6.show();sparkForSubway(df6,"/time_subwayData.csv");//数据聚合,获取每天人数最多的地铁numberDataset<Row> df7 = df4.groupBy("dateTime").agg(max("people").alias("max_people"));sparkForSubway(df7,"/everyday_max_subwayData.csv");//数据聚合,计算每天的客流强度:每天总people除以632840Dataset<Row> df8 = df4.groupBy("dateTime").agg(sum("people").divide(632.84).alias("strength"));sparkForSubway(df8,"/everyday_strength_subwayData.csv");} catch (Exception e) {e.printStackTrace();}}private static void sparkForSubway(Dataset<Row> df6, String hdfsPath) throws IOException {// 保存处理后的数据到HDFSdf6.coalesce(1).write().mode("overwrite").option("header", "true").csv("hdfs://192.168.44.128:9000/time_subwayData");// 创建Hadoop配置Configuration conf = new Configuration();// 获取FileSystem实例FileSystem fs = FileSystem.get(URI.create("hdfs://192.168.44.128:9000"), conf);// 定义临时目录和目标文件路径Path tempDir = new Path("/time_subwayData");FileStatus[] files = fs.listStatus(tempDir);// 检查目标文件是否存在,如果存在则删除Path targetFile1 = new Path(hdfsPath);if (fs.exists(targetFile1)) {fs.delete(targetFile1, true); // true 表示递归删除}for (FileStatus file : files) {if (file.isFile() && file.getPath().getName().startsWith("part-")) {Path targetFile = new Path(hdfsPath);fs.rename(file.getPath(), targetFile);}}// 删除临时目录fs.delete(tempDir, true);}}
4、运行
- 项目运行完后,打开浏览器
- spark处理地铁数据
- http://localhost:8686/spark/dispose
- spark处理地铁数据
- 观察spark和hdfs
- http://192.168.44.128:8099/
- http://192.168.44.128:9870/explorer.html#/
相关文章:
SpringBoot操作spark处理hdfs文件
SpringBoot操作spark处理hdfs文件 1、导入依赖 <!-- spark依赖--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.2.2</version></dependency><depend…...
消息队列架构、选型、专有名词解释
私人博客传送门 消息队列专有名词解释 | 魔筝炼药师 MQ选型 | 魔筝炼药师 MQ架构 | 魔筝炼药师 MQ顺序消息 | 魔筝炼药师...
用OpenCV实现UVC视频分屏
分屏 OpencvUVC代码验证后话 用OpenCV实现UVC摄像头的视频分屏。 Opencv opencv里有很多视频图像的处理功能。 UVC Usb 视频类,免驱动的。视频流格式有MJPG和YUY2。MJPG是RGB三色通道的。要对三通道进行分屏显示。 代码 import cv2 import numpy as np video …...
Allure 集成 pytest
Allure 是一个强大的测试报告工具,与 pytest 集成可以生成详细的测试报告,包括测试步骤、测试数据、截图、错误堆栈等。 1. 安装 Allure 和相关依赖 安装 pytest-allure-adaptor 插件: pip install allure-pytest确保本地已安装 Allure 工具。…...
【Python】构建智能语音助手:使用Python实现语音识别与合成的全面指南
随着人工智能技术的迅猛发展,语音助手已成为人们日常生活中不可或缺的一部分。从智能手机到智能家居设备,语音交互提供了便捷高效的人机交互方式。本文旨在全面介绍如何利用Python编程语言及其强大的库——SpeechRecognition和gTTS,构建一个基…...
在 Arthas 中调用 Spring Bean 方法
获取 Spring 应用上下文 使用工具类 如果你的项目中有一个工具类实现了 ApplicationContextAware 接口,如 cn.shutdown.pf.utils.SpringContextUtils,可以使用该类获取 ApplicationContext: Component public final class SpringContextUt…...
Nginx入门笔记
Nginx入门笔记 一、Nginx基本概念二、代理1、正向代理2、反向代理 三、准备工作1、CentOS 7安装nginx(1). 安装必要的依赖(2)下载nginx(3)编译安装(4)编译并安装 Nginx(5)启动nginx …...
【单片机】实现一个简单的ADC滤波器
实现一个 ADC的滤波器,PT1 滤波器(也称为一阶低通滤波器),用于对输入信号进行滤波处理。 typedef struct PT1FilterSettings PT1FilterSettings; struct PT1FilterSettings {//! last Filter output valueuint32_t filtValOld;//…...
开源 vGPU 方案 HAMi 解析
开源 vGPU 方案 HAMi 一、k8s 环境下 GPU 资源管理的现状与问题 (一)资源感知与绑定 在 k8s 中,资源与节点紧密绑定。对于 GPU 资源,我们依赖 NVIDIA 提供的 device-plugin 来进行感知,并将其上报到 kube-apiserver…...
备考蓝桥杯:顺序表详解(静态顺序表,vector用法)
目录 1.顺序表的概念 2.静态顺序表的实现 总代码 3.stl库动态顺序表vector 测试代码 1.顺序表的概念 要理解顺序表,我们要先了解一下什么是线性表 线性表是n个具有相同特征的数据元素的序列 这就是一个线性表 a1是表头 a4是表尾 a2是a3的前驱 a3是a2的后继 空…...
OA系统如何做好DDOS防护
OA系统如何做好DDOS防护?在数字化办公蔚然成风的当下,OA(办公自动化)系统作为企业内部管理与协作的神经中枢,其安全性和稳定性直接关系到企业的日常运营效率、信息流通效率以及长远发展。OA系统不仅承载着企业内部的日…...
使用 Python 的 pyttsx3 库进行文本转语音
1. 什么是 pyttsx3? 1.1 pyttsx3 是一个 Python 库,它可以将文本转换为语音。与其他文本转语音库(如 gTTS)不同,pyttsx3 不依赖于网络服务,它使用本地的 TTS(Text-to-Speech)引擎&a…...
如何在Windows上编译OpenCV4.7.0
前言 参考:Win10 下编译 OpenCV 4.7.0详细全过程,包含xfeatures2d 这里在其基础上还出现了一些问题,仅供参考。 正文 一、环境 1、win10 2、cmake-gui 3、opencv4.7.0 4、VS2019 二、编译过程 1、下载需要的文件: 通…...
【玩转全栈】----Django连接MySQL
阅前先赞,养好习惯! 目录 1、ORM框架介绍 选择建议 2、安装mysqlclient 3、创建数据库 4、修改settings,连接数据库 5、对数据库进行操作 创建表 删除表 添加数据 删除数据 修改(更新)数据: 获取数据 1、OR…...
25/1/4 算法笔记<强化学习> 生成对抗模仿学习
基于生成对抗网络的模仿学习,假设存在一个专家智能体,其策略可以看成最优策略,我们就可以通过直接模仿这个专家在环境中交互的动作数据来训练一个策略,并不需要用到环境提供的奖励信息。 生成对抗模仿学习GAIL实质上就是模仿了专家…...
Flink维表方案选型
Iceberg Iceberg 采用全量预加载数据的方式将维度表数据全部加载到内存中进行关联,虽然可以避免频繁访问外部数据库,但对计算节点的内存消耗很高,不能适用于数量很大的维度表。除此之外,当 Iceberg 维表数据更新后,可…...
Oracle Database 23ai 新特性: UPDATE 和 DELETE 语句的直接联接
Oracle Database 23c 引入了一系列令人振奋的新特性,其中一项尤为引人注目的是对 UPDATE 和 DELETE 语句支持直接联接(Direct Join)。这一新功能极大地简化了复杂数据操作的实现,提升了性能,并为数据库开发者提供了更强…...
机器学习之随机森林算法实现和特征重要性排名可视化
随机森林算法实现和特征重要性排名可视化 目录 随机森林算法实现和特征重要性排名可视化1 随机森林算法1.1 概念1.2 主要特点1.3 优缺点1.4 步骤1.5 函数及参数1.5.1 函数导入1.5.2 参数 1.6 特征重要性排名 2 实际代码测试 1 随机森林算法 1.1 概念 是一种基于树模型的集成学…...
网络安全图谱以及溯源算法
本文提出了一种网络攻击溯源框架,以及一种网络安全知识图谱,该图由六个部分组成,G <H,V,A,E,L,S,R>。 1|11.知识图 网络知识图由六个部分组成,…...
单片机-外部中断
中断是指 CPU 在处理某一事件 A 时,发生了另一事件 B,请求 CPU 迅速去处理(中断发生);CPU 暂时停止当前的工作(中断响应), 转去处理事件 B(中断服务);待 CPU 将事件 B 处理完毕后,再回到原来事件 A 被中断的…...
超短脉冲激光自聚焦效应
前言与目录 强激光引起自聚焦效应机理 超短脉冲激光在脆性材料内部加工时引起的自聚焦效应,这是一种非线性光学现象,主要涉及光学克尔效应和材料的非线性光学特性。 自聚焦效应可以产生局部的强光场,对材料产生非线性响应,可能…...
反向工程与模型迁移:打造未来商品详情API的可持续创新体系
在电商行业蓬勃发展的当下,商品详情API作为连接电商平台与开发者、商家及用户的关键纽带,其重要性日益凸显。传统商品详情API主要聚焦于商品基本信息(如名称、价格、库存等)的获取与展示,已难以满足市场对个性化、智能…...
C++.OpenGL (10/64)基础光照(Basic Lighting)
基础光照(Basic Lighting) 冯氏光照模型(Phong Lighting Model) #mermaid-svg-GLdskXwWINxNGHso {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-GLdskXwWINxNGHso .error-icon{fill:#552222;}#mermaid-svg-GLd…...
Spring Boot+Neo4j知识图谱实战:3步搭建智能关系网络!
一、引言 在数据驱动的背景下,知识图谱凭借其高效的信息组织能力,正逐步成为各行业应用的关键技术。本文聚焦 Spring Boot与Neo4j图数据库的技术结合,探讨知识图谱开发的实现细节,帮助读者掌握该技术栈在实际项目中的落地方法。 …...
[Java恶补day16] 238.除自身以外数组的乘积
给你一个整数数组 nums,返回 数组 answer ,其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法,且在 O(n) 时间复杂度…...
宇树科技,改名了!
提到国内具身智能和机器人领域的代表企业,那宇树科技(Unitree)必须名列其榜。 最近,宇树科技的一项新变动消息在业界引发了不少关注和讨论,即: 宇树向其合作伙伴发布了一封公司名称变更函称,因…...
深入理解Optional:处理空指针异常
1. 使用Optional处理可能为空的集合 在Java开发中,集合判空是一个常见但容易出错的场景。传统方式虽然可行,但存在一些潜在问题: // 传统判空方式 if (!CollectionUtils.isEmpty(userInfoList)) {for (UserInfo userInfo : userInfoList) {…...
【堆垛策略】设计方法
堆垛策略的设计是积木堆叠系统的核心,直接影响堆叠的稳定性、效率和容错能力。以下是分层次的堆垛策略设计方法,涵盖基础规则、优化算法和容错机制: 1. 基础堆垛规则 (1) 物理稳定性优先 重心原则: 大尺寸/重量积木在下…...
在 Visual Studio Code 中使用驭码 CodeRider 提升开发效率:以冒泡排序为例
目录 前言1 插件安装与配置1.1 安装驭码 CodeRider1.2 初始配置建议 2 示例代码:冒泡排序3 驭码 CodeRider 功能详解3.1 功能概览3.2 代码解释功能3.3 自动注释生成3.4 逻辑修改功能3.5 单元测试自动生成3.6 代码优化建议 4 驭码的实际应用建议5 常见问题与解决建议…...
Mac flutter环境搭建
一、下载flutter sdk 制作 Android 应用 | Flutter 中文文档 - Flutter 中文开发者网站 - Flutter 1、查看mac电脑处理器选择sdk 2、解压 unzip ~/Downloads/flutter_macos_arm64_3.32.2-stable.zip \ -d ~/development/ 3、添加环境变量 命令行打开配置环境变量文件 ope…...

