使用python-Spark使用的场景案例具体代码分析
使用场景
1. 数据批处理
• 日志分析:互联网公司每天会产生海量的服务器日志,如访问日志、应用程序日志等。Spark可以高效地读取这些日志文件,对数据进行清洗(例如去除无效记录、解析日志格式)、转换(例如提取关键信息如用户ID、访问时间、访问页面等)和分析(例如统计页面访问量、用户访问路径等)。
• 数据仓库ETL(Extract,Transform,Load):在构建数据仓库时,需要从各种数据源(如关系型数据库、文件系统等)提取数据,进行清洗、转换和加载到数据仓库中。Spark可以处理大规模的数据,并且通过其丰富的转换操作(如对数据进行聚合、关联等),能够很好地完成ETL流程。
2. 机器学习与数据挖掘
• 推荐系统:基于用户的行为数据(如购买记录、浏览历史等)和物品的属性数据,Spark MLlib(机器学习库)可以用于构建推荐模型。例如,使用协同过滤算法来发现用户的兴趣偏好,为用户推荐可能感兴趣的商品、电影、音乐等。
• 聚类分析:对于大规模的数据集,如客户细分场景下的用户特征数据,Spark可以应用聚类算法(如K - Means)将相似的用户或数据点聚集在一起,帮助企业更好地理解客户群体的结构,进行精准营销等活动。
3. 实时数据处理
• 实时监控与预警:在金融领域,Spark Streaming可以实时处理股票交易数据,计算关键指标(如实时股价波动、成交量变化等),当指标超出设定的阈值时,及时发出预警信号。
• 实时交通数据分析:通过接入交通传感器(如摄像头、测速仪等)的实时数据,Spark可以对交通流量、车速、拥堵情况等进行实时分析,为交通管理部门提供决策支持,如动态调整交通信号灯时间。
Spark使用场景的详细代码案例
1. 数据批处理 - 日志分析
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, count, col# 创建SparkSession
spark = SparkSession.builder.appName("LogAnalysis").getOrCreate()# 假设日志数据格式为每行: [IP地址, 时间戳, 请求方法, 请求路径, 协议, 状态码, 用户代理]
# 这里模拟一些日志数据
log_data = [("192.168.1.1", "2024-11-08 10:00:00", "GET", "/index.html", "HTTP/1.1", "200", "Mozilla/5.0"),("192.168.1.2", "2024-11-08 10:05:00", "GET", "/about.html", "HTTP/1.1", "200", "Chrome/100.0"),("192.168.1.1", "2024-11-08 10:10:00", "POST", "/login", "HTTP/1.1", "200", "Mozilla/5.0")
]columns = ["ip", "timestamp", "method", "path", "protocol", "status", "user_agent"]
df = spark.createDataFrame(log_data, columns)# 统计每个页面的访问次数
page_views = df.groupBy("path").agg(count("*").alias("views"))
page_views.show()# 统计每个IP的请求次数
ip_requests = df.groupBy("ip").agg(count("*").alias("requests"))
ip_requests.show()# 找出访问次数最多的前5个页面
top_pages = page_views.orderBy(col("views").desc()).limit(5)
top_pages.show()# 关闭SparkSession
spark.stop()
2. 数据批处理 - 数据仓库 ETL(以从CSV文件提取数据并加载到新表为例)
from pyspark.sql import SparkSession
import os# 创建SparkSession
spark = SparkSession.builder.appName("ETLExample").getOrCreate()# 假设源数据是一个CSV文件,路径为以下(这里可以替换为真实路径)
csv_path = "/path/to/source/csv/file.csv"
if not os.path.exists(csv_path):raise FileNotFoundError(f"CSV file not found at {csv_path}")# 读取CSV文件,假设CSV文件有列名:id, name, age
df = spark.read.csv(csv_path, header=True, inferSchema=True)# 进行一些数据转换,比如将年龄加1(这里只是示例,可以根据实际需求调整)
df = df.withColumn("new_age", df.age + 1)# 假设要将数据加载到一个新的Parquet格式文件中,路径为以下(可替换)
output_path = "/path/to/output/parquet/file.parquet"
df.write.mode("overwrite").parquet(output_path)# 关闭SparkSession
spark.stop()
3. 机器学习与数据挖掘 - 推荐系统(基于用户 - 物品评分的协同过滤)
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator# 创建SparkSession
spark = SparkSession.builder.appName("RecommendationSystem").getOrCreate()# 模拟用户 - 物品评分数据
data = [(1, 1, 5.0),(1, 2, 4.0),(2, 1, 3.0),(2, 2, 2.0),(2, 3, 4.0),(3, 1, 2.0),(3, 3, 5.0),(4, 2, 3.0),(4, 3, 4.0),
]columns = ["user_id", "item_id", "rating"]# 创建DataFrame
df = spark.createDataFrame(data, columns)# 划分训练集和测试集
(train_df, test_df) = df.randomSplit([0.8, 0.2])# 创建ALS模型
als = ALS(userCol="user_id", itemCol="item_id", ratingCol="rating", coldStartStrategy="drop")# 训练模型
model = als.fit(train_df)# 对测试集进行预测
predictions = model.transform(test_df)# 评估模型
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error = {rmse}")# 为用户生成推荐
user_recs = model.recommendForAllUsers(5)
user_recs.show(truncate=False)# 关闭SparkSession
spark.stop()
4. 机器学习与数据挖掘 - 聚类分析(K - Means 聚类)
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans# 创建SparkSession
spark = SparkSession.builder.appName("KMeansClustering").getOrCreate()# 模拟客户特征数据,这里假设每个客户有两个特征:年龄和收入
data = [(25, 50000),(30, 60000),(35, 70000),(40, 80000),(28, 55000),(32, 65000),
]columns = ["age", "income"]
df = spark.createDataFrame(data, columns)# 将特征列组合成一个向量列
assembler = VectorAssembler(inputCols=columns, outputCol="features")
df = assembler.transform(df)# 创建K - Means模型,设置聚类数为3
kmeans = KMeans(k=3, seed=1)
model = kmeans.fit(df)# 预测聚类结果
predictions = model.transform(df)
predictions.show()# 关闭SparkSession
spark.stop()
5. 实时数据处理 - 实时监控与预警(以简单的股票价格监控为例,使用Spark Streaming和Socket模拟实时数据)
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext# 创建SparkSession和StreamingContext,设置批处理间隔为5秒
spark = SparkSession.builder.appName("StockMonitoring").getOrCreate()
ssc = StreamingContext(spark.sparkContext, 5)# 这里使用Socket模拟接收实时股票价格数据,实际中可能是从消息队列等接收
# 假设数据格式为: 股票代码,价格
lines = ssc.socketTextStream("localhost", 9999)# 解析数据
data = lines.map(lambda line: line.split(","))# 将数据转换为DataFrame格式(这里只是简单示例,可能需要更多处理)
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
schema = StructType([StructField("symbol", StringType(), True),StructField("price", DoubleType(), True)
])
df = data.toDF(schema)# 假设要监控某只股票(这里以股票代码为 'AAPL' 为例),当价格超过150时预警
apple_stock = df.filter(df.symbol == "AAPL")
alert = apple_stock.filter(df.price > 150).map(lambda row: f"Alert: {row.symbol} price {row.price} is high!")alert.pprint()# 启动 StreamingContext
ssc.start()
# 等待停止
ssc.awaitTermination()
6. 实时数据处理 - 实时交通数据分析(使用Spark Streaming和Kafka,假设已经有Kafka环境和交通数据主题)
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json# 创建SparkSession和StreamingContext,设置批处理间隔为10秒
spark = SparkSession.builder.appName("TrafficAnalysis").getOrCreate()
ssc = StreamingContext(spark.sparkContext, 10)# Kafka参数,这里需要替换为真实的Kafka服务器地址和交通数据主题
kafkaParams = {"metadata.broker.list": "localhost:9092"}
topic = "traffic_data"# 从Kafka读取实时交通数据
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams)# 解析JSON格式的交通数据,假设数据包含车速、车流量等信息
def parse_traffic_data(json_str):try:data = json.loads(json_str)return (data["location"], data["speed"], data["volume"])except Exception as e:print(f"Error parsing data: {e}")return Noneparsed_data = kafkaStream.map(lambda x: parse_traffic_data(x[1]))
valid_data = parsed_data.filter(lambda x: x is not None)# 将数据转换为DataFrame格式(这里只是简单示例,可能需要更多处理)
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
schema = StructType([StructField("location", StringType(), True),StructField("speed", DoubleType(), True),StructField("volume", DoubleType(), True)
])
df = valid_data.toDF(schema)# 计算每个区域的平均车速和车流量
average_speed_volume = df.groupBy("location").agg({"speed": "avg", "volume": "sum"})
average_speed_volume.pprint()# 启动 StreamingContext
ssc.start()
# 等待停止
ssc.awaitTermination()
相关文章:
使用python-Spark使用的场景案例具体代码分析
使用场景 1. 数据批处理 • 日志分析:互联网公司每天会产生海量的服务器日志,如访问日志、应用程序日志等。Spark可以高效地读取这些日志文件,对数据进行清洗(例如去除无效记录、解析日志格式)、转换(例如…...
如何查看本地的个人SSH密钥
1.确保你的电脑上安装了 Git。 你可以通过终端或命令提示符输入以下命令来检查: git --version 如果没有安装,请前往 Git 官网 下载并安装适合你操作系统的版本。 2.查找SSH密钥 默认情况下,SSH密钥存储在你的用户目录下的.ssh文件夹中。…...
本人认为 写程序的三大基本原则
1. 合法性 定义:合法性指的是程序必须遵守法律法规和道德规范,不得用于非法活动。 建议: 了解法律法规:在编写程序之前,了解并遵守所在国家或地区的法律法规,特别是与数据隐私、版权、网络安…...
A030-基于Spring boot的公司资产网站设计与实现
🙊作者简介:在校研究生,拥有计算机专业的研究生开发团队,分享技术代码帮助学生学习,独立完成自己的网站项目。 代码可以查看文章末尾⬇️联系方式获取,记得注明来意哦~🌹 赠送计算机毕业设计600…...
React Hooks 深度解析与实战
💓 博客主页:瑕疵的CSDN主页 📝 Gitee主页:瑕疵的gitee主页 ⏩ 文章专栏:《热点资讯》 React Hooks 深度解析与实战 React Hooks 深度解析与实战 React Hooks 深度解析与实战 引言 什么是 Hooks? 定义 为什么需要 Ho…...
#渗透测试#SRC漏洞挖掘#蓝队基础之网络七层杀伤链04 终章
网络杀伤链模型(Kill Chain Model)是一种用于描述和分析网络攻击各个阶段的框架。这个模型最初由洛克希德马丁公司提出,用于帮助企业和组织识别和防御网络攻击。网络杀伤链模型将网络攻击过程分解为多个阶段,每个阶段都有特定的活…...
计算机毕业设计Python+大模型农产品推荐系统 农产品爬虫 农产品商城 农产品大数据 农产品数据分析可视化 PySpark Hadoop
温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 作者简介:Java领…...
爬虫补环境案例---问财网(rpc,jsdom,代理,selenium)
目录 一.环境检测 1. 什么是环境检测 2.案例讲解 二 .吐环境脚本 1. 简介 2. 基础使用方法 3.数据返回 4. 完整代理使用 5. 代理封装 6. 封装所有使用方法 jsdom补环境 1. 环境安装 2. 基本使用 3. 添加参数形式 Selenium补环境 1. 简介 2.实战案例 1. 逆向目…...
SpringBoot有几种获取Request对象的方法
HttpServletRequest 简称 Request,它是一个 Servlet API 提供的对象,用于获取客户端发起的 HTTP 请求信息。例如:获取请求参数、获取请求头、获取 Session 会话信息、获取请求的 IP 地址等信息。 那么问题来了,在 Spring Boot 中…...
在 Windows 11 中使用 MuMu 模拟器 12 国际版配置代理
**以下是优化后的教学内容,使用 Markdown 格式,便于粘贴到 CSDN 或其他支持 Markdown 格式的编辑器中: 在 Windows 11 中使用 MuMu 模拟器 12 国际版配置代理 MuMu 模拟器内有网络设置功能,可以直接在模拟器中配置代理。但如果你不确定如何操作,可以通过在 Windows 端设…...
ASP.NET Core Webapi 返回数据的三种方式
ASP.NET Core为Web API控制器方法返回类型提供了如下几个选择: Specific type IActionResult ActionResult<T> 1. 返回指定类型(Specific type) 最简单的API会返回原生的或者复杂的数据类型(比如,string 或者…...
SQL面试题——蚂蚁SQL面试题 连续3天减少碳排放量不低于100的用户
连续3天减少碳排放量不低于100的用户 这是一道来自蚂蚁的面试题目,要求我们找出连续3天减少碳排放量低于100的用户,之前我们分析过两道关于连续的问题了 SQL面试题——最大连续登陆问题 SQL面试题——球员连续四次得分 这两个问题都是跟连续有关的,但是球员连续得分的难…...
Python酷库之旅-第三方库Pandas(216)
目录 一、用法精讲 1011、pandas.DatetimeIndex.tz属性 1011-1、语法 1011-2、参数 1011-3、功能 1011-4、返回值 1011-5、说明 1011-6、用法 1011-6-1、数据准备 1011-6-2、代码示例 1011-6-3、结果输出 1012、pandas.DatetimeIndex.freq属性 1012-1、语法 1012…...
论文解析:计算能力资源的可信共享:利益驱动的异构网络服务提供机制
目录 论文解析:计算能力资源的可信共享:利益驱动的异构网络服务提供机制 KM-SMA算法 KM-SMA算法通过不断更新节点的可行顶点标记值(也称为顶标),利用匈牙利方法(Hungarian method)来获取匹配结果。在获取匹配结果后,该算法还会判断该结果是否满足Pareto最优性,即在没…...
Spring AOP技术
1.AOP基本介绍 AOP 的全称 (aspect oriented programming) ,面向切面编程。 1.和传统的面向对象不同。 面向切面编程是根据自我的需求,将切面类的方法切入到其他的类的方法中。(这么说抽象吧!来张图来解释。) 如图 传…...
数字IC实践项目(10)—基于System Verilog的DDR4 Model/Tb 及基础Verification IP的设计与验证(付费项目)
数字IC实践项目(10)—基于System Verilog的DDR4 Model/Tb 及基础Verification IP的设计与验证(付费项目) 前言项目框图1)DDR4 Verification IP2)DDR4 JEDEC Model & Tb 项目文件1)DDR4 Veri…...
MATLAB保存多帧图形为视频格式
基本思路 在Matlab中,要将drawnow绘制的多帧数据保存为视频格式,首先需要创建一个视频写入对象。这个对象用于将每一帧图像数据按照视频格式的要求进行组合和编码。然后,在每次drawnow更新绘图后,将当前的图形窗口内容捕获为一帧图…...
redis7.x源码分析:(3) dict字典
dict字典采用经典hash表数据结构实现,由键值对组成,类似于C中的unordered_map。两者在代码实现层面存在一些差异,比如gnustl的unordered_map分配的桶数组个数是(质数n),而dict分配的桶数组个数是࿰…...
连续九届EI稳定|江苏科技大学主办
【九届EI检索稳定|江苏科技大学主办 | IEEE出版 】 🎈【截稿倒计时】!!! ✨徐秘书:gsra_huang ✨往届均已检索,已上线IEEE官网 🎊第九届清洁能源与发电技术国际学术会议(CEPGT 2…...
HarmonyOS NEXT应用开发实战 ( 应用的签名、打包上架,各种证书详解)
前言 没经历过的童鞋,首次对HarmonyOS的应用签名打包上架可能感觉繁琐。需要各种秘钥证书生成和申请,混在一起也分不清。其实搞清楚后也就那会事,各个文件都有它存在的作用。 HarmonyOS通过数字证书与Profile文件等签名信息来保证鸿蒙应用/…...
SkyWalking 10.2.0 SWCK 配置过程
SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外,K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案,全安装在K8S群集中。 具体可参…...
【JVM】- 内存结构
引言 JVM:Java Virtual Machine 定义:Java虚拟机,Java二进制字节码的运行环境好处: 一次编写,到处运行自动内存管理,垃圾回收的功能数组下标越界检查(会抛异常,不会覆盖到其他代码…...
Linux云原生安全:零信任架构与机密计算
Linux云原生安全:零信任架构与机密计算 构建坚不可摧的云原生防御体系 引言:云原生安全的范式革命 随着云原生技术的普及,安全边界正在从传统的网络边界向工作负载内部转移。Gartner预测,到2025年,零信任架构将成为超…...
相机Camera日志分析之三十一:高通Camx HAL十种流程基础分析关键字汇总(后续持续更新中)
【关注我,后续持续新增专题博文,谢谢!!!】 上一篇我们讲了:有对最普通的场景进行各个日志注释讲解,但相机场景太多,日志差异也巨大。后面将展示各种场景下的日志。 通过notepad++打开场景下的日志,通过下列分类关键字搜索,即可清晰的分析不同场景的相机运行流程差异…...
C++ 求圆面积的程序(Program to find area of a circle)
给定半径r,求圆的面积。圆的面积应精确到小数点后5位。 例子: 输入:r 5 输出:78.53982 解释:由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982,因为我们只保留小数点后 5 位数字。 输…...
【论文阅读28】-CNN-BiLSTM-Attention-(2024)
本文把滑坡位移序列拆开、筛优质因子,再用 CNN-BiLSTM-Attention 来动态预测每个子序列,最后重构出总位移,预测效果超越传统模型。 文章目录 1 引言2 方法2.1 位移时间序列加性模型2.2 变分模态分解 (VMD) 具体步骤2.3.1 样本熵(S…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
Java求职者面试指南:Spring、Spring Boot、MyBatis框架与计算机基础问题解析
Java求职者面试指南:Spring、Spring Boot、MyBatis框架与计算机基础问题解析 一、第一轮提问(基础概念问题) 1. 请解释Spring框架的核心容器是什么?它在Spring中起到什么作用? Spring框架的核心容器是IoC容器&#…...
SQL慢可能是触发了ring buffer
简介 最近在进行 postgresql 性能排查的时候,发现 PG 在某一个时间并行执行的 SQL 变得特别慢。最后通过监控监观察到并行发起得时间 buffers_alloc 就急速上升,且低水位伴随在整个慢 SQL,一直是 buferIO 的等待事件,此时也没有其他会话的争抢。SQL 虽然不是高效 SQL ,但…...
【C++】纯虚函数类外可以写实现吗?
1. 答案 先说答案,可以。 2.代码测试 .h头文件 #include <iostream> #include <string>// 抽象基类 class AbstractBase { public:AbstractBase() default;virtual ~AbstractBase() default; // 默认析构函数public:virtual int PureVirtualFunct…...
