基于Scala开发Spark ML的ALS推荐模型实战
推荐系统,广泛应用到电商,营销行业。本文通过Scala,开发Spark ML的ALS算法训练推荐模型,用于电影评分预测推荐。
算法简介
ALS算法是Spark ML中实现协同过滤的矩阵分解方法。
ALS,即交替最小二乘法(Alternating Least Squares),是协同过滤技术中的一种经典算法。它通过对用户和物品的潜在特征进行建模,来预测用户对未知物品的评分或偏好。具体介绍如下:
- 矩阵分解模型:在推荐系统中,我们通常有一个用户-物品的评分矩阵,其中行表示用户,列表示物品,矩阵中的值代表用户对物品的评分。然而,这个矩阵通常是非常稀疏的,因为用户只给少数物品评分。ALS算法就是在这样的不完整评分矩阵上操作,通过矩阵分解来补全缺失值,进而产生推荐。
- 算法原理:ALS算法的核心思想是通过迭代过程更新用户和物品的潜在因子向量。在每次迭代中,一个评分被建模为用户潜在特征向量和物品潜在特征向量的点积,加上一个偏差项。通过最小化实际评分和预测评分之间的差异来不断优化这些潜在特征向量。
- Spark ML实现:在Spark ML库中,ALS算法被用于处理大规模的数据集,并提供了多种参数以适应不同的数据特性和需求。例如,可以设置潜在因子的数量、正则化参数、迭代次数等。此外,Spark ML的ALS还支持隐式反馈数据的变体,这对于无法获取明确评分的数据非常有用。
总的来说,ALS是一种强大的推荐系统算法,尤其适用于处理大规模稀疏数据集。通过合理地选择和调整参数,可以在保持高效计算的同时获得良好的推荐质量。
代码实战
pom.xml文件更新,加入相关依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>sparkGNU2023</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><scala.version>2.13</scala.version><spark.version>3.4.1</spark.version><log4j.version>1.2.17</log4j.version><slf4j.version>1.7.22</slf4j.version></properties><dependencies><!--日志相关依赖--><dependency><groupId>org.slf4j</groupId><artifactId>jcl-over-slf4j</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>com.thoughtworks.paranamer</groupId><artifactId>paranamer</artifactId><version>2.8</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.13</artifactId><version>3.4.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.13</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.13</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.13</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.13</artifactId><version>3.4.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.13</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>2.4.8</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.30</version></dependency><dependency><groupId>org.apache.flume.flume-ng-clients</groupId><artifactId>flume-ng-log4jappender</artifactId><version>1.11.0</version></dependency><!-- flume 拦截器相关依赖--><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.6.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>
训练ALS模型
基于scala训练ALS模型
package base.charpter10import breeze.linalg.sum
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.sql.functions.{col, count, explode, when}
import org.apache.spark.sql.{DataFrame, SparkSession}/*** @projectName sparkGNU2023 * @package base.charpter10 * @className base.charpter10.MovieRecommender * @description ${description} * @author pblh123* @date 2024/3/29 15:18* @version 1.0**/object MovieRecommender {def main(args: Array[String]): Unit = {// 创建Spark会话val spark = SparkSession.builder().appName("MovieRecommender").master("local[*]").getOrCreate()import spark.implicits._// 假设我们有一个用户-物品评分数据集,格式为(userId, itemId, rating)/*** UserID,MovieID,Rating,Timestamp* 1,1193,5,978300760* 1,661,3,978302109*/// 指定CSV文件的路径,以及解析选项val csvFilePath = "data/ratings.csv"val csvOptions = Map("header" -> "true", // 是否有列名头"inferSchema" -> "true", // 是否自动推断数据类型"encoding" -> "UTF-8", // 如果有特定的编码格式,例如对于包含中文的CSV文件:)// 读取CSV文件并创建DataFrameval ratingsDF = spark.read.format("csv").options(csvOptions).load(csvFilePath)// 显示DataFrame的前几行以验证数据是否正确加载println("查看原始据数据样例:")ratingsDF.show(5)val ratings: DataFrame = ratingsDF.select("UserID", "MovieID", "Rating").withColumnRenamed("UserID", "userId").withColumnRenamed("MovieID", "itemId").withColumnRenamed("Rating", "rating")// 将数据集分割为训练集和测试集val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))println("查看训练集数据")training.show(5)println("查看测试集数据")test.show(5)// 设置ALS参数// 创建一个ALS实例并配置参数val als = new ALS().setMaxIter(10) // 设置最大迭代次数为5,10,本地测试时,设置过大,会报错.setRegParam(0.01) // 设置正则化参数为0.01.setUserCol("userId") // 设置用户列名为"userId".setItemCol("itemId") // 设置物品列名为"itemId".setRatingCol("rating") // 设置评分列名为"rating"/*** ALS(Alternating Least Squares)是一种基于矩阵分解的协同过滤算法,用于处理用户和物品之间的评分数据。各参数说明如下:* setMaxIter: 设置最大迭代次数,决定模型训练的精细程度。迭代次数越多,模型通常越精确,但训练时间也可能更长。* setRegParam: 设置正则化参数,用于控制模型的复杂度和过拟合程度。较小的正则化参数值可能导致模型过复杂,容易过拟合;较大的值则可能导致模型过于简单,欠拟合。* setUserCol, setItemCol, setRatingCol: 分别设置用户ID列、物品ID列和评分列的名称。这些列名根据实际的数据结构来确定,用于告诉ALS算法在哪些列中查找用户、物品和评分信息。*/// 训练ALS模型println("开始训练模型")val model = als.fit(training)// 对测试集进行预测val predictions = model.transform(test)predictions.show()predictions.filter($"rating".isNotNull && $"prediction".isNotNull).count() // 确认有非空的评分和预测值// 评估模型val evaluator = new RegressionEvaluator().setMetricName("rmse").setLabelCol("rating").setPredictionCol("prediction")val rmse = evaluator.evaluate(predictions)println(s"Root-mean-square error = $rmse")// 为用户生成推荐// 该函数是基于一个模型(model)为所有用户推荐项目的函数。它将为每个用户推荐5个项目/*** +------+--------------------------------------------------------------------------------------------+* |userId|recommendations[{itemid,pred_rating},{itemid,pred_rating},...] |* +------+--------------------------------------------------------------------------------------------+* |12 |[{1864, 9.721167}, {2964, 8.815781}, {3867, 8.480173}, {1539, 7.8904114}, {563, 7.8829007}] |* |22 |[{2964, 6.090676}, {3215, 5.6165895}, {1534, 5.4731245}, {718, 5.462125}, {2632, 5.4482727}]|*/val userRecs = model.recommendForAllUsers(5)userRecs.show(5,false)println("保存预测结果")
// userRecs.write.mode("overwrite").parquet("models/recomALSmodel") // 保存为parquet格式,一般用于集群中
// userRecs是一个DataFrame,其中"recommendations"列是数组类型val explodedUserRecs = userRecs.withColumn("recommendations", explode($"recommendations")).select($"userId", $"recommendations.itemId".as("itemId"), $"recommendations.rating".as("PredRating"))explodedUserRecs.write.mode("overwrite").format("csv").save("predictRes/recomALS") // PC 调试使用// 保存模型到指定路径val modelPath = "models/recomALSmodel"model.write.overwrite().save(modelPath)println(s"Model saved to $modelPath")// 停止Spark会话spark.stop()/*当程序试图停止Spark会话时,可能会触发清理临时文件的操作,从而导致出现NoSuchFileException异常。通常情况下,这不是代码逻辑的问题,而是Spark内部在清理资源时可能出现的问题。可以尝试重启Spark环境或者适当增大Spark的临时目录空间来避免此类问题。*/}}
运行代码,效果图如下
TodoList:目前RMSE计算出问题,原数据清洗没有做,模型参数还可以调整。后期调整更新后,再发一篇文章。
使用训练的模型预测新数据
scala开发应用模型demo代码
package base.charpter10import org.apache.spark.ml.recommendation.ALSModel
import org.apache.spark.sql.SparkSession/*** @projectName sparkGNU2023 * @package base.charpter10 * @className base.charpter10.RecommendationModelLoadDemo * @description ${description} * @author pblh123* @date 2024/3/29 15:36* @version 1.0**/object RecommendationModelLoadDemo {def main(args: Array[String]): Unit = {// 创建Spark会话val spark = SparkSession.builder().master("local[*]").appName("RecommendationModelUsageDemo").getOrCreate()import spark.implicits._// 加载之前保存的ALS模型val modelPath = "models/recomALSmodel"val loadedModel: ALSModel = ALSModel.load(modelPath)// 假设我们有一些新的用户-物品对,我们想要预测它们的评分val userItemPairs = Seq((1, 4), // 用户1对物品4的评分预测(2, 2) // 用户2对物品2的评分预测).toDF("userId", "itemId")// 使用模型进行评分预测val predictions = loadedModel.transform(userItemPairs)predictions.show()// 现在,假设我们想要为用户1生成前N个推荐物品val numRecommendations = 5 // 为用户推荐的物品数量val userRecs = loadedModel.recommendForAllUsers(numRecommendations)userRecs.show(5,false)// 停止Spark会话spark.stop()}}
运行效果如下
评估效果说明:目前的预测评分不合理,是因为模型没有经过精挑,优化,预测的记过会依据预测评分高低排序,选取得分高的前5个结果返回。后期模型调优后,结果就正常了。
相关文章:

基于Scala开发Spark ML的ALS推荐模型实战
推荐系统,广泛应用到电商,营销行业。本文通过Scala,开发Spark ML的ALS算法训练推荐模型,用于电影评分预测推荐。 算法简介 ALS算法是Spark ML中实现协同过滤的矩阵分解方法。 ALS,即交替最小二乘法(Alte…...
Go语言和Java编程语言的主要区别
目录 1.设计理念: 2.语法: 3.性能: 4.并发性: 5.内存管理: 6.标准库: 7.社区和支持: 8.应用领域: Go(也称为Golang)和Java是两种不同的编程语言&…...
【TypeScript系列】与其它构建工具整合
与其它构建工具整合 构建工具 BabelBrowserifyDuoGruntGulpJspmWebpackMSBuildNuGet Babel 安装 npm install babel/cli babel/core babel/preset-typescript --save-dev.babelrc {"presets": ["babel/preset-typescript"] }使用命令行工具 ./node_…...

Java | Leetcode Java题解之第12题整数转罗马数字
题解: 题解: class Solution {String[] thousands {"", "M", "MM", "MMM"};String[] hundreds {"", "C", "CC", "CCC", "CD", "D", "DC…...

哈佛大学商业评论 --- 第五篇:智能眼镜之战
AR将全面融入公司发展战略! AR将成为人类和机器之间的新接口! AR将成为人类的关键技术之一! 请将此文转发给您的老板! --- 专题作者:Michael E.Porter和James E.Heppelmann 虽然物理世界是三维的,但大多…...

paddlepaddle模型转换onnx指导文档
一、检查本机cuda版本 1、右键找到invdia控制面板 2、找到系统信息 3、点开“组件”选项卡, 可以看到cuda版本,我们这里是cuda11.7 cuda驱动版本为516.94 二、安装paddlepaddle环境 1、获取pip安装命令 ,我们到paddlepaddle官网ÿ…...

图像处理与视觉感知---期末复习重点(6)
文章目录 一、图像分割二、间断检测2.1 概述2.2 点检测2.3 线检测2.4 边缘检测 三、边缘连接3.1 概述3.2 Hough变换3.3 例子3.4 Hough变换的具体步骤3.5 Hough变换的法线表示形式3.6 Hough变换的扩展 四、阈值处理4.1 概述4.2 计算基本全局阈值算法4.3 自适应阈值 五、基于区域…...
git 如何删除本地和远程分支
删除本地分支 确认当前分支:首先,确保你没有在要删除的分支上。你可以通过运行git branch命令来查看当前的分支。 切换分支:如果你在要删除的分支上,需要先切换到另一个分支。例如,切换到main分支,可以使用…...
Kong基于QPS、IP限流
Rate Limiting限流插件 https://docs.konghq.com/hub/kong-inc/rate-limiting/ 它可以针对consumer ,credential ,ip ,service,path,header 等多种维度来进行限流.流量控制的精准度也有多种方式可以参考,比如可以做到秒级,分钟级,小时级等限流控制. 基于IP限流 源码地址&…...

基于springboot实现甘肃非物质文化网站系统项目【项目源码+论文说明】
摘要 现代经济快节奏发展以及不断完善升级的信息化技术,让传统数据信息的管理升级为软件存储,归纳,集中处理数据信息的管理方式。本甘肃非物质文化网站就是在这样的大环境下诞生,其可以帮助管理者在短时间内处理完毕庞大的数据信…...

【瑞萨RA6M3】1. 基于 vscode 搭建开发环境
基于 vscode 搭建开发环境 1. 准备2. 安装2.1. 安装瑞萨软件包2.2. 安装编译器2.3. 安装 cmake2.4. 安装 openocd2.5. 安装 ninja2.6. 安装 make 3. 生成初始代码4. 修改 cmake 脚本5. 调试准备6. 仿真 1. 准备 需要瑞萨仓库中的两个软件: MDK_Device_Packs.zipse…...

使用pip install替代conda install将packet下载到anaconda虚拟环境
问题描述 使用conda install 下载 stable_baseline3出现问题 一番搜索下是Anaconda.org缺少源 解决方法 首先使用管理员权限打开 anaconda prompt 然后激活目标环境:conda activate env_name 接着使用:conda env list查看目标env的位置 如D:\anacon…...

【HTML】常用CSS属性
文章目录 前言1、字体和文本属性2、边距和填充3、border边框4、列表属性 前言 上一篇我们学习了CSS扩展选择器以及它的继承性,对于页面元素样式设置相信大家都不陌生了。 这一篇我们就来看看具体都有哪些样式可以设置?又该如何设置? 喜欢的【…...
python中的print(f‘‘)具体用法
在Python中,print(f) 是格式化字符串(f-string)的语法,它允许你在字符串中嵌入表达式,这些表达式在运行时会被其值所替换。f 或 F 前缀表示这是一个格式化字符串字面量。 在 f 或 F 中的大括号 {} 内,你可…...
《青少年成长管理2024》022 “成长七要素之三:文化”4/5
《青少年成长管理2024》022 “成长七要素之三:文化”4/5 七、物质文化(一)什么是物质文化(二)物质文化的分类(三)人类物质文化最新成果有哪些(四)青少年了解物质文化的途…...

Linux(05) Debian 系统修改主机名
查看主机名 方法1:hostname hostname 方法2:cat etc/hostname cat /etc/hostname 如果在创建Linux系统的时候忘记修改主机名,可以采用以下的方式来修改主机名称。 修改主机名 注意,在linux中下划线“_”可能是无效的字符&…...

之前翻硬币问题胡思乱想的完善
题目背景 小明正在玩一个“翻硬币”的游戏。 题目描述 桌上放着排成一排的若干硬币。我们用 * 表示正面,用 o 表示反面(是小写字母,不是零),比如可能情形是 **oo***oooo,如果同时翻转左边的两个硬币&#x…...

前端与后端协同:实现Excel导入导出功能
🌟 前言 欢迎来到我的技术小宇宙!🌌 这里不仅是我记录技术点滴的后花园,也是我分享学习心得和项目经验的乐园。📚 无论你是技术小白还是资深大牛,这里总有一些内容能触动你的好奇心。🔍 &#x…...

Docker:探索容器化技术,重塑云计算时代应用交付与管理
一,引言 在云计算时代,随着开发者逐步将应用迁移至云端以减轻硬件管理负担,软件配置与环境一致性问题日益凸显。Docker的横空出世,恰好为软件开发者带来了全新的解决方案,它革新了软件的打包、分发和管理方式ÿ…...

畅捷通T+ KeyInfoList.aspx SQL漏洞复现
0x01 产品简介 畅捷通 T+ 是一款灵动,智慧,时尚的基于互联网时代开发的管理软件,主要针对中小型工贸与商贸企业,尤其适合有异地多组织机构(多工厂,多仓库,多办事处,多经销商)的企业,涵盖了财务,业务,生产等领域的应用,产品应用功能包括:采购管理、库存管理、销售…...
Cursor实现用excel数据填充word模版的方法
cursor主页:https://www.cursor.com/ 任务目标:把excel格式的数据里的单元格,按照某一个固定模版填充到word中 文章目录 注意事项逐步生成程序1. 确定格式2. 调试程序 注意事项 直接给一个excel文件和最终呈现的word文件的示例,…...

【kafka】Golang实现分布式Masscan任务调度系统
要求: 输出两个程序,一个命令行程序(命令行参数用flag)和一个服务端程序。 命令行程序支持通过命令行参数配置下发IP或IP段、端口、扫描带宽,然后将消息推送到kafka里面。 服务端程序: 从kafka消费者接收…...
ubuntu搭建nfs服务centos挂载访问
在Ubuntu上设置NFS服务器 在Ubuntu上,你可以使用apt包管理器来安装NFS服务器。打开终端并运行: sudo apt update sudo apt install nfs-kernel-server创建共享目录 创建一个目录用于共享,例如/shared: sudo mkdir /shared sud…...

K8S认证|CKS题库+答案| 11. AppArmor
目录 11. AppArmor 免费获取并激活 CKA_v1.31_模拟系统 题目 开始操作: 1)、切换集群 2)、切换节点 3)、切换到 apparmor 的目录 4)、执行 apparmor 策略模块 5)、修改 pod 文件 6)、…...
Objective-C常用命名规范总结
【OC】常用命名规范总结 文章目录 【OC】常用命名规范总结1.类名(Class Name)2.协议名(Protocol Name)3.方法名(Method Name)4.属性名(Property Name)5.局部变量/实例变量(Local / Instance Variables&…...
电脑插入多块移动硬盘后经常出现卡顿和蓝屏
当电脑在插入多块移动硬盘后频繁出现卡顿和蓝屏问题时,可能涉及硬件资源冲突、驱动兼容性、供电不足或系统设置等多方面原因。以下是逐步排查和解决方案: 1. 检查电源供电问题 问题原因:多块移动硬盘同时运行可能导致USB接口供电不足&#x…...
反射获取方法和属性
Java反射获取方法 在Java中,反射(Reflection)是一种强大的机制,允许程序在运行时访问和操作类的内部属性和方法。通过反射,可以动态地创建对象、调用方法、改变属性值,这在很多Java框架中如Spring和Hiberna…...
Redis的发布订阅模式与专业的 MQ(如 Kafka, RabbitMQ)相比,优缺点是什么?适用于哪些场景?
Redis 的发布订阅(Pub/Sub)模式与专业的 MQ(Message Queue)如 Kafka、RabbitMQ 进行比较,核心的权衡点在于:简单与速度 vs. 可靠与功能。 下面我们详细展开对比。 Redis Pub/Sub 的核心特点 它是一个发后…...

Python Ovito统计金刚石结构数量
大家好,我是小马老师。 本文介绍python ovito方法统计金刚石结构的方法。 Ovito Identify diamond structure命令可以识别和统计金刚石结构,但是无法直接输出结构的变化情况。 本文使用python调用ovito包的方法,可以持续统计各步的金刚石结构,具体代码如下: from ovito…...

AI+无人机如何守护濒危物种?YOLOv8实现95%精准识别
【导读】 野生动物监测在理解和保护生态系统中发挥着至关重要的作用。然而,传统的野生动物观察方法往往耗时耗力、成本高昂且范围有限。无人机的出现为野生动物监测提供了有前景的替代方案,能够实现大范围覆盖并远程采集数据。尽管具备这些优势…...