使用python-Spark使用的场景案例具体代码分析
使用场景
1. 数据批处理
• 日志分析:互联网公司每天会产生海量的服务器日志,如访问日志、应用程序日志等。Spark可以高效地读取这些日志文件,对数据进行清洗(例如去除无效记录、解析日志格式)、转换(例如提取关键信息如用户ID、访问时间、访问页面等)和分析(例如统计页面访问量、用户访问路径等)。
• 数据仓库ETL(Extract,Transform,Load):在构建数据仓库时,需要从各种数据源(如关系型数据库、文件系统等)提取数据,进行清洗、转换和加载到数据仓库中。Spark可以处理大规模的数据,并且通过其丰富的转换操作(如对数据进行聚合、关联等),能够很好地完成ETL流程。
2. 机器学习与数据挖掘
• 推荐系统:基于用户的行为数据(如购买记录、浏览历史等)和物品的属性数据,Spark MLlib(机器学习库)可以用于构建推荐模型。例如,使用协同过滤算法来发现用户的兴趣偏好,为用户推荐可能感兴趣的商品、电影、音乐等。
• 聚类分析:对于大规模的数据集,如客户细分场景下的用户特征数据,Spark可以应用聚类算法(如K - Means)将相似的用户或数据点聚集在一起,帮助企业更好地理解客户群体的结构,进行精准营销等活动。
3. 实时数据处理
• 实时监控与预警:在金融领域,Spark Streaming可以实时处理股票交易数据,计算关键指标(如实时股价波动、成交量变化等),当指标超出设定的阈值时,及时发出预警信号。
• 实时交通数据分析:通过接入交通传感器(如摄像头、测速仪等)的实时数据,Spark可以对交通流量、车速、拥堵情况等进行实时分析,为交通管理部门提供决策支持,如动态调整交通信号灯时间。
Spark使用场景的详细代码案例
1. 数据批处理 - 日志分析
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, count, col# 创建SparkSession
spark = SparkSession.builder.appName("LogAnalysis").getOrCreate()# 假设日志数据格式为每行: [IP地址, 时间戳, 请求方法, 请求路径, 协议, 状态码, 用户代理]
# 这里模拟一些日志数据
log_data = [("192.168.1.1", "2024-11-08 10:00:00", "GET", "/index.html", "HTTP/1.1", "200", "Mozilla/5.0"),("192.168.1.2", "2024-11-08 10:05:00", "GET", "/about.html", "HTTP/1.1", "200", "Chrome/100.0"),("192.168.1.1", "2024-11-08 10:10:00", "POST", "/login", "HTTP/1.1", "200", "Mozilla/5.0")
]columns = ["ip", "timestamp", "method", "path", "protocol", "status", "user_agent"]
df = spark.createDataFrame(log_data, columns)# 统计每个页面的访问次数
page_views = df.groupBy("path").agg(count("*").alias("views"))
page_views.show()# 统计每个IP的请求次数
ip_requests = df.groupBy("ip").agg(count("*").alias("requests"))
ip_requests.show()# 找出访问次数最多的前5个页面
top_pages = page_views.orderBy(col("views").desc()).limit(5)
top_pages.show()# 关闭SparkSession
spark.stop()
2. 数据批处理 - 数据仓库 ETL(以从CSV文件提取数据并加载到新表为例)
from pyspark.sql import SparkSession
import os# 创建SparkSession
spark = SparkSession.builder.appName("ETLExample").getOrCreate()# 假设源数据是一个CSV文件,路径为以下(这里可以替换为真实路径)
csv_path = "/path/to/source/csv/file.csv"
if not os.path.exists(csv_path):raise FileNotFoundError(f"CSV file not found at {csv_path}")# 读取CSV文件,假设CSV文件有列名:id, name, age
df = spark.read.csv(csv_path, header=True, inferSchema=True)# 进行一些数据转换,比如将年龄加1(这里只是示例,可以根据实际需求调整)
df = df.withColumn("new_age", df.age + 1)# 假设要将数据加载到一个新的Parquet格式文件中,路径为以下(可替换)
output_path = "/path/to/output/parquet/file.parquet"
df.write.mode("overwrite").parquet(output_path)# 关闭SparkSession
spark.stop()
3. 机器学习与数据挖掘 - 推荐系统(基于用户 - 物品评分的协同过滤)
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator# 创建SparkSession
spark = SparkSession.builder.appName("RecommendationSystem").getOrCreate()# 模拟用户 - 物品评分数据
data = [(1, 1, 5.0),(1, 2, 4.0),(2, 1, 3.0),(2, 2, 2.0),(2, 3, 4.0),(3, 1, 2.0),(3, 3, 5.0),(4, 2, 3.0),(4, 3, 4.0),
]columns = ["user_id", "item_id", "rating"]# 创建DataFrame
df = spark.createDataFrame(data, columns)# 划分训练集和测试集
(train_df, test_df) = df.randomSplit([0.8, 0.2])# 创建ALS模型
als = ALS(userCol="user_id", itemCol="item_id", ratingCol="rating", coldStartStrategy="drop")# 训练模型
model = als.fit(train_df)# 对测试集进行预测
predictions = model.transform(test_df)# 评估模型
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error = {rmse}")# 为用户生成推荐
user_recs = model.recommendForAllUsers(5)
user_recs.show(truncate=False)# 关闭SparkSession
spark.stop()
4. 机器学习与数据挖掘 - 聚类分析(K - Means 聚类)
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans# 创建SparkSession
spark = SparkSession.builder.appName("KMeansClustering").getOrCreate()# 模拟客户特征数据,这里假设每个客户有两个特征:年龄和收入
data = [(25, 50000),(30, 60000),(35, 70000),(40, 80000),(28, 55000),(32, 65000),
]columns = ["age", "income"]
df = spark.createDataFrame(data, columns)# 将特征列组合成一个向量列
assembler = VectorAssembler(inputCols=columns, outputCol="features")
df = assembler.transform(df)# 创建K - Means模型,设置聚类数为3
kmeans = KMeans(k=3, seed=1)
model = kmeans.fit(df)# 预测聚类结果
predictions = model.transform(df)
predictions.show()# 关闭SparkSession
spark.stop()
5. 实时数据处理 - 实时监控与预警(以简单的股票价格监控为例,使用Spark Streaming和Socket模拟实时数据)
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext# 创建SparkSession和StreamingContext,设置批处理间隔为5秒
spark = SparkSession.builder.appName("StockMonitoring").getOrCreate()
ssc = StreamingContext(spark.sparkContext, 5)# 这里使用Socket模拟接收实时股票价格数据,实际中可能是从消息队列等接收
# 假设数据格式为: 股票代码,价格
lines = ssc.socketTextStream("localhost", 9999)# 解析数据
data = lines.map(lambda line: line.split(","))# 将数据转换为DataFrame格式(这里只是简单示例,可能需要更多处理)
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
schema = StructType([StructField("symbol", StringType(), True),StructField("price", DoubleType(), True)
])
df = data.toDF(schema)# 假设要监控某只股票(这里以股票代码为 'AAPL' 为例),当价格超过150时预警
apple_stock = df.filter(df.symbol == "AAPL")
alert = apple_stock.filter(df.price > 150).map(lambda row: f"Alert: {row.symbol} price {row.price} is high!")alert.pprint()# 启动 StreamingContext
ssc.start()
# 等待停止
ssc.awaitTermination()
6. 实时数据处理 - 实时交通数据分析(使用Spark Streaming和Kafka,假设已经有Kafka环境和交通数据主题)
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json# 创建SparkSession和StreamingContext,设置批处理间隔为10秒
spark = SparkSession.builder.appName("TrafficAnalysis").getOrCreate()
ssc = StreamingContext(spark.sparkContext, 10)# Kafka参数,这里需要替换为真实的Kafka服务器地址和交通数据主题
kafkaParams = {"metadata.broker.list": "localhost:9092"}
topic = "traffic_data"# 从Kafka读取实时交通数据
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams)# 解析JSON格式的交通数据,假设数据包含车速、车流量等信息
def parse_traffic_data(json_str):try:data = json.loads(json_str)return (data["location"], data["speed"], data["volume"])except Exception as e:print(f"Error parsing data: {e}")return Noneparsed_data = kafkaStream.map(lambda x: parse_traffic_data(x[1]))
valid_data = parsed_data.filter(lambda x: x is not None)# 将数据转换为DataFrame格式(这里只是简单示例,可能需要更多处理)
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
schema = StructType([StructField("location", StringType(), True),StructField("speed", DoubleType(), True),StructField("volume", DoubleType(), True)
])
df = valid_data.toDF(schema)# 计算每个区域的平均车速和车流量
average_speed_volume = df.groupBy("location").agg({"speed": "avg", "volume": "sum"})
average_speed_volume.pprint()# 启动 StreamingContext
ssc.start()
# 等待停止
ssc.awaitTermination()
相关文章:

使用python-Spark使用的场景案例具体代码分析
使用场景 1. 数据批处理 • 日志分析:互联网公司每天会产生海量的服务器日志,如访问日志、应用程序日志等。Spark可以高效地读取这些日志文件,对数据进行清洗(例如去除无效记录、解析日志格式)、转换(例如…...

如何查看本地的个人SSH密钥
1.确保你的电脑上安装了 Git。 你可以通过终端或命令提示符输入以下命令来检查: git --version 如果没有安装,请前往 Git 官网 下载并安装适合你操作系统的版本。 2.查找SSH密钥 默认情况下,SSH密钥存储在你的用户目录下的.ssh文件夹中。…...

本人认为 写程序的三大基本原则
1. 合法性 定义:合法性指的是程序必须遵守法律法规和道德规范,不得用于非法活动。 建议: 了解法律法规:在编写程序之前,了解并遵守所在国家或地区的法律法规,特别是与数据隐私、版权、网络安…...

A030-基于Spring boot的公司资产网站设计与实现
🙊作者简介:在校研究生,拥有计算机专业的研究生开发团队,分享技术代码帮助学生学习,独立完成自己的网站项目。 代码可以查看文章末尾⬇️联系方式获取,记得注明来意哦~🌹 赠送计算机毕业设计600…...

React Hooks 深度解析与实战
💓 博客主页:瑕疵的CSDN主页 📝 Gitee主页:瑕疵的gitee主页 ⏩ 文章专栏:《热点资讯》 React Hooks 深度解析与实战 React Hooks 深度解析与实战 React Hooks 深度解析与实战 引言 什么是 Hooks? 定义 为什么需要 Ho…...

#渗透测试#SRC漏洞挖掘#蓝队基础之网络七层杀伤链04 终章
网络杀伤链模型(Kill Chain Model)是一种用于描述和分析网络攻击各个阶段的框架。这个模型最初由洛克希德马丁公司提出,用于帮助企业和组织识别和防御网络攻击。网络杀伤链模型将网络攻击过程分解为多个阶段,每个阶段都有特定的活…...

计算机毕业设计Python+大模型农产品推荐系统 农产品爬虫 农产品商城 农产品大数据 农产品数据分析可视化 PySpark Hadoop
温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 作者简介:Java领…...

爬虫补环境案例---问财网(rpc,jsdom,代理,selenium)
目录 一.环境检测 1. 什么是环境检测 2.案例讲解 二 .吐环境脚本 1. 简介 2. 基础使用方法 3.数据返回 4. 完整代理使用 5. 代理封装 6. 封装所有使用方法 jsdom补环境 1. 环境安装 2. 基本使用 3. 添加参数形式 Selenium补环境 1. 简介 2.实战案例 1. 逆向目…...

SpringBoot有几种获取Request对象的方法
HttpServletRequest 简称 Request,它是一个 Servlet API 提供的对象,用于获取客户端发起的 HTTP 请求信息。例如:获取请求参数、获取请求头、获取 Session 会话信息、获取请求的 IP 地址等信息。 那么问题来了,在 Spring Boot 中…...

在 Windows 11 中使用 MuMu 模拟器 12 国际版配置代理
**以下是优化后的教学内容,使用 Markdown 格式,便于粘贴到 CSDN 或其他支持 Markdown 格式的编辑器中: 在 Windows 11 中使用 MuMu 模拟器 12 国际版配置代理 MuMu 模拟器内有网络设置功能,可以直接在模拟器中配置代理。但如果你不确定如何操作,可以通过在 Windows 端设…...

ASP.NET Core Webapi 返回数据的三种方式
ASP.NET Core为Web API控制器方法返回类型提供了如下几个选择: Specific type IActionResult ActionResult<T> 1. 返回指定类型(Specific type) 最简单的API会返回原生的或者复杂的数据类型(比如,string 或者…...

SQL面试题——蚂蚁SQL面试题 连续3天减少碳排放量不低于100的用户
连续3天减少碳排放量不低于100的用户 这是一道来自蚂蚁的面试题目,要求我们找出连续3天减少碳排放量低于100的用户,之前我们分析过两道关于连续的问题了 SQL面试题——最大连续登陆问题 SQL面试题——球员连续四次得分 这两个问题都是跟连续有关的,但是球员连续得分的难…...

Python酷库之旅-第三方库Pandas(216)
目录 一、用法精讲 1011、pandas.DatetimeIndex.tz属性 1011-1、语法 1011-2、参数 1011-3、功能 1011-4、返回值 1011-5、说明 1011-6、用法 1011-6-1、数据准备 1011-6-2、代码示例 1011-6-3、结果输出 1012、pandas.DatetimeIndex.freq属性 1012-1、语法 1012…...

论文解析:计算能力资源的可信共享:利益驱动的异构网络服务提供机制
目录 论文解析:计算能力资源的可信共享:利益驱动的异构网络服务提供机制 KM-SMA算法 KM-SMA算法通过不断更新节点的可行顶点标记值(也称为顶标),利用匈牙利方法(Hungarian method)来获取匹配结果。在获取匹配结果后,该算法还会判断该结果是否满足Pareto最优性,即在没…...

Spring AOP技术
1.AOP基本介绍 AOP 的全称 (aspect oriented programming) ,面向切面编程。 1.和传统的面向对象不同。 面向切面编程是根据自我的需求,将切面类的方法切入到其他的类的方法中。(这么说抽象吧!来张图来解释。) 如图 传…...

数字IC实践项目(10)—基于System Verilog的DDR4 Model/Tb 及基础Verification IP的设计与验证(付费项目)
数字IC实践项目(10)—基于System Verilog的DDR4 Model/Tb 及基础Verification IP的设计与验证(付费项目) 前言项目框图1)DDR4 Verification IP2)DDR4 JEDEC Model & Tb 项目文件1)DDR4 Veri…...

MATLAB保存多帧图形为视频格式
基本思路 在Matlab中,要将drawnow绘制的多帧数据保存为视频格式,首先需要创建一个视频写入对象。这个对象用于将每一帧图像数据按照视频格式的要求进行组合和编码。然后,在每次drawnow更新绘图后,将当前的图形窗口内容捕获为一帧图…...

redis7.x源码分析:(3) dict字典
dict字典采用经典hash表数据结构实现,由键值对组成,类似于C中的unordered_map。两者在代码实现层面存在一些差异,比如gnustl的unordered_map分配的桶数组个数是(质数n),而dict分配的桶数组个数是࿰…...

连续九届EI稳定|江苏科技大学主办
【九届EI检索稳定|江苏科技大学主办 | IEEE出版 】 🎈【截稿倒计时】!!! ✨徐秘书:gsra_huang ✨往届均已检索,已上线IEEE官网 🎊第九届清洁能源与发电技术国际学术会议(CEPGT 2…...

HarmonyOS NEXT应用开发实战 ( 应用的签名、打包上架,各种证书详解)
前言 没经历过的童鞋,首次对HarmonyOS的应用签名打包上架可能感觉繁琐。需要各种秘钥证书生成和申请,混在一起也分不清。其实搞清楚后也就那会事,各个文件都有它存在的作用。 HarmonyOS通过数字证书与Profile文件等签名信息来保证鸿蒙应用/…...

【CICD】CICD 持续集成与持续交付在测试中的应用
一、什么是CICD? CI/CD 是指持续集成(Continuous Integration)和持续部署(Continuous Deployment)或持续交付(Continuous Delivery) 1.1 持续集成(Continuous Integration…...

Dolby TrueHD和Dolby Digital Plus (E-AC-3)编码介绍
文章目录 1. Dolby TrueHD特点总结 2. Dolby Digital Plus (E-AC-3)特点总结 Dolby TrueHD 与 Dolby Digital Plus (E-AC-3) 的对比 Dolby TrueHD和Dolby Digital Plus (E-AC-3) 是两种高级的杜比音频编码格式,常用于蓝光影碟、流媒体、影院等高品质音频传输场景。它…...

数字频率计的设计-- 基于 HDL 方法
目录 数字频率计的设计 1.计数、锁存与显示译码电路设计 2.主控电路设计 3.分频电路设计 4.顶层电路设计 伪随机序列发生器 的设计 数字频率计的设计 基于HDL设计数字系统时,可以根据需要应用Verilog HDL描述所需要的功能电路,既有利于节约资源&am…...

[程序员] 没有产生core文件的原因
最近和同事一块看一个core文件没有产生的问题,总结了一些在CSDN的专栏里。分析的过程,参考使用了ftrace的功能,感觉非常实用。 如果有需要可以参考。大体上就这么几种情况:信号的特殊处理,coredump相关的配置没有设置正确,文件系统访问权限问题,setuid相关的不匹配问题。…...

【数字图像处理+MATLAB】基于 Sobel 算子计算图像梯度并进行边缘增强:使用 imgradientxy 函数
引言 在图像处理中,边缘通常是图像中像素强度变化最大的地方,这种变化可以通过计算图像的梯度来量化。梯度是一个向量,它的方向指向像素强度增加最快的方向,它的大小(或者说幅度)表示像素强度增加的速度。…...

P10901 [蓝桥杯 2024 省 C] 封闭图形个数
铁子们好呀,今天博主给大家更新一道编程题!!! 题目链接如下: P10901 [蓝桥杯 2024 省 C] 封闭图形个数 好,接下来,我将从三个方面讲解这道例题。分别是 题目解析算法原理代码实现 文章目录 1.题…...

ubuntu-desktop-24.04上手指南(更新阿里源、安装ssh、安装chrome、设置固定IP、安装搜狗输入法)
ubuntu-desktop-24.04上手指南(更新阿里源、安装ssh、安装chrome、设置固定IP、安装搜狗输入法) 一、更新并安装基础软件 #切换root用户 sudo su -#更新 apt update #升级 apt upgrade#install vim apt install vim#install net-tools apt install net-tools二、安装ssh并设置…...

手机直连卫星NTN通信初步研究
目录 1、手机直连卫星之序幕 2、卫星NTN及其网络架构 2.1 NTN 2.2 NTN网络架构 3、NTN的3GPP标准化进程 3.1 NTN需要适应的特性 3.2 NTN频段 3.3 NTN的3GPP标准化进程概况 3.4 NTN的3GPP标准化进程的详情 3.4.1 NR-NTN 3.4.1.1 NTN 的无线相关 SI/WI 3.4.1.2…...

蓝桥杯c++算法学习【2】之搜索与查找(九宫格、穿越雷区、迷宫与陷阱、扫地机器人:::非常典型的必刷例题!!!)
别忘了请点个赞收藏关注支持一下博主喵!!! 关注博主,更多蓝桥杯nice题目静待更新:) 搜索与查找 一、九宫格 【问题描述】 小明最近在教邻居家的小朋友小学奥数,而最近正好讲述到了三阶幻方这个部分,三 …...

Android加载pdf
依赖 implementation com.squareup.okhttp3:okhttp:4.9.1 implementation com.github.barteksc:android-pdf-viewer:3.2.0-beta.1在project.build中添加该源 maven { url "https://repository.liferay.com/nexus/content/repositories/public/" }XML <LinearLa…...