day16_推荐系统和总结
文章目录
- day16_推荐系统和总结
- 一、推荐实现
- 1、基于流行度推荐(掌握)
- 1.1 近期热门商品推荐
- 1.2 个人热门商品推荐
- 2、基于隐语义模型的协同过滤推荐(了解)
- 2.1 ALS算法介绍
- 2.2 推荐代码
- 3、基于物品的协同过滤推荐(了解)
- 4、基于用户的协同过滤推荐(了解)
- 5、基于关联规则的推荐(熟悉)
- 5.1 关联规则详解
- 5.2 FP-growth算法理解
- 5.3 SparkMLlib中的FP-growth算法
- 5.4 完整代码
- 6、服务部署(了解)
day16_推荐系统和总结
一、推荐实现
推荐系统一般是由Java后端与前端人员进行开发的,大数据开发人员比较少参与主要是提供数据。
为了实现推荐功能,需要启动Hadoop、Hive、ES、Doris、SparkSubmit
启动Hadoop、启动Hivecd /./up01.sh start启动ES1- 切换用户su es2- 进入目录cd /home/es/elasticsearch-7.10.2/bin3- 启动elasticsearch -d4- 退出es用户exit启动Doris/export/server/doris/fe/bin/start_fe.sh --daemon/export/server/doris/be/bin/start_be.sh --daemon/export/server/doris/apache_hdfs_broker/bin/start_broker.sh --daemon启动SparkSubmitcd /export/server/spark/sbin./start-thriftserver.sh \--hiveconf hive.server2.thrift.port=10001 \--hiveconf hive.server2.thrift.bind.host=up01 \--hiveconf spark.sql.warehouse.dir=hdfs://up01:8020/user/hive/warehouse \--master local[*]
1、基于流行度推荐(掌握)
基于流行度推荐,也就是基于统计的推荐,主要用来解决用户的冷启动问题,对于新用户首次登录。基于流行度的推荐也可以用于单独的热门商品模块。
1.1 近期热门商品推荐
可以按商品销售的单量进行倒序排序,然后存入Doirs中。表中可以多存入一些数据,在使用时,根据品类进行倒序查询,取到相关商品即可。
- 计算的sql如下
-- 近期热门商品推荐
selectcurrent_date() as recommend_date, -- 推荐时间,区别是什么时候推荐的third_category_no,third_category_name,goods_no,goods_name,count(order_no) as order_count -- 订单量
from dwm.dwm_sold_goods_sold_dtl_i
where-- 过滤最近一段时间内的销售数据datediff(current_date(),to_date(trade_date))<=40 and goods_no is not null
group by third_category_no,third_category_name,goods_no,goods_name
order by order_count desc
limit 300 -- 推荐比项目经理要求的推荐数目多一些
- Doris的建表语句如下
create database if not exists recommend_db;
CREATE TABLE IF NOT EXISTS recommend_db.popular_hot_goods (recommend_date DATE comment '计算日期',goods_no bigint comment '商品编码',third_category_no STRING comment '三级品类编码',third_category_name STRING comment '三级品类名称',goods_name STRING comment '商品名称',order_count INT comment '销售数量'
)
UNIQUE KEY(recommend_date, goods_no)
comment '热门商品推荐'
PARTITION BY RANGE(recommend_date) ()
DISTRIBUTED BY HASH(goods_no) BUCKETS 1
sql ("dynamic_partition.create_history_partition" = "true","dynamic_partition.enable" = "true","dynamic_partition.time_unit" = "DAY","dynamic_partition.start" = "-365","dynamic_partition.end" = "3","dynamic_partition.prefix" = "p","dynamic_partition.buckets" = "10","replication_allocation" = "tag.location.default: 1"
);注意: 1- 如果多个字段作为UNIQUE KEY,那么String类型不能够使用。因此这里将goods_no的类型进行了强制转换。为了将不同日期的数据分开存放,这里使用动态分区表。2- 建表中的字段顺序需要与UNIQUE KEY中字段顺序保持一致。并且UNIQUE KEY中的字段要放在最上面。如果不遵守会报如下的错:
- 推荐代码
from pyspark.sql import SparkSession
import os
import pyspark.sql.functions as Fos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.appName("topn_goods_recommend")\.master("local[*]") \.config("spark.sql.warehouse.dir", "hdfs://up01:8020/user/hive/warehouse") \.config("hive.metastore.uris", "thrift://up01:9083") \.config("spark.sql.shuffle.partitions",2)\.enableHiveSupport() \.getOrCreate()# 2- 读取分析Hive中的数据,获取TOPN的热门商品topn_goods_df = spark.sql("""selectcurrent_date() as recommend_date, -- 推荐时间,区别是什么时候推荐的third_category_no,third_category_name,goods_no,goods_name,count(order_no) as order_count -- 订单量from dwm.dwm_sold_goods_sold_dtl_iwhere-- 过滤最近一段时间内的销售数据datediff(current_date(),to_date(trade_date))<=40 and goods_no is not nullgroup by third_category_no,third_category_name,goods_no,goods_nameorder by order_count desclimit 300 -- 推荐比项目经理要求的推荐数目多一些""")# 3- 数据存储到Doris中topn_goods_df.write.jdbc(url="jdbc:mysql://192.168.88.166:9030/recommend_db?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false",table="popular_hot_goods",mode="append",sql={ 'user' : 'root', 'password' : '123456' })# 4- 释放资源spark.stop()
1.2 个人热门商品推荐
也就是根据销售单量统计每个人喜欢购买的前N个商品
- 计算的sql如下
selectcurrent_date() as recommend_date, -- 推荐时间,区别是什么时候推荐的zt_id as user_id,goods_no,goods_name,third_category_no,third_category_name,order_count
from (select*,row_number() over(partition by zt_id order by order_count desc) as rnfrom (selectzt_id,third_category_no,third_category_name,goods_no,goods_name,count(order_no) as order_count -- 订单量from dwm.dwm_sold_goods_sold_dtl_iwhere-- 过滤最近一段时间内的销售数据datediff(current_date(),to_date(trade_date))<=40 and goods_no is not nulland zt_id is not null and zt_id!=0group by zt_id,third_category_no,third_category_name,goods_no,goods_name) tmp_1
) tmp_2 where rn<=20
- Doris建表语句
CREATE TABLE IF NOT EXISTS recommend_db.popular_person_hot_goods (recommend_date DATE comment '计算日期',user_id INT comment '会员ID',goods_no STRING comment '商品编码',goods_name STRING comment '商品名称',third_category_no STRING comment '三级品类编码',third_category_name STRING comment '三级品类名称',order_count INT comment '订单数'
)
UNIQUE KEY(recommend_date, user_id)
comment '个人热门商品推荐'
PARTITION BY RANGE(recommend_date) ()
DISTRIBUTED BY HASH(user_id) BUCKETS 1
sql ("dynamic_partition.create_history_partition" = "true","dynamic_partition.enable" = "true","dynamic_partition.time_unit" = "DAY","dynamic_partition.start" = "-365","dynamic_partition.end" = "3","dynamic_partition.prefix" = "p","dynamic_partition.buckets" = "10","replication_allocation" = "tag.location.default: 1"
);
- 推荐代码
from pyspark.sql import SparkSession
import os
import pyspark.sql.functions as Fos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.appName("topn_user_goods_recommend")\.master("local[*]") \.config("spark.sql.warehouse.dir", "hdfs://up01:8020/user/hive/warehouse") \.config("hive.metastore.uris", "thrift://up01:9083") \.config("spark.sql.shuffle.partitions",8)\.enableHiveSupport() \.getOrCreate()# 2- 读取分析Hive中的数据,获取TOPN的热门商品topn_goods_df = spark.sql("""selectcurrent_date() as recommend_date, -- 推荐时间,区别是什么时候推荐的zt_id as user_id,goods_no,goods_name,third_category_no,third_category_name,order_countfrom (select*,row_number() over(partition by zt_id order by order_count desc) as rnfrom (selectzt_id,third_category_no,third_category_name,goods_no,goods_name,count(order_no) as order_count -- 订单量from dwm.dwm_sold_goods_sold_dtl_iwhere-- 过滤最近一段时间内的销售数据datediff(current_date(),to_date(trade_date))<=40 and goods_no is not nulland zt_id is not null and zt_id!=0group by zt_id,third_category_no,third_category_name,goods_no,goods_name) tmp_1) tmp_2 where rn<=20""")# 3- 数据存储到Doris中topn_goods_df.write.jdbc(url="jdbc:mysql://192.168.88.166:9030/recommend_db?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false",table="popular_person_hot_goods",mode="append",sql={ 'user' : 'root', 'password' : '123456' })# 4- 释放资源spark.stop()
2、基于隐语义模型的协同过滤推荐(了解)
基于隐语义模型的协同过滤方法结合了协同过滤的思想和隐语义模型的技术,通过矩阵分解等方法,将用户-项目交互矩阵分解为两个低维矩阵,分别表示用户在隐空间中的向量和项目在隐空间中的向量。
2.1 ALS算法介绍
ALS算法是2008年以来,用的比较多的协同过滤算法。它已经集成到Spark的Mllib库中,使用起来比较方便。从协同过滤的分类来说,ALS算法属于User-Item CF,也叫做混合CF。它同时考虑了User和Item两个方面。
spark.ml目前支持基于模型的协同过滤,使用交替最小二乘法(ALS)算法实现。
spark.ml的实现具有以下参数:
- numBlocks:用户和物品将被分成的块数,以便并行计算(默认为10)
- rank:模型中的潜在因子数量(默认为10)
- maxIter:运行的最大迭代次数(默认为10)
- regParam:在ALS中指定的正则化参数(默认为1.0)
- implicitPrefs:指定是否使用显式反馈ALS变体或适用于隐式反馈数据的变体(默认为false,表示使用显式反馈)
- alpha:适用于ALS隐式反馈变体的参数,决定了对偏好观察的基本置信度(默认为1.0)
- nonnegative:指定是否对最小二乘法使用非负约束(默认为false)
注意:基于DataFrame的ALS API目前仅支持整数类型的用户和物品ID。
2.2 推荐代码
如果使用基于ALS的协同过滤模型进行推荐,关键是要构造用户对商品的评分数据。评分主要来源于用户的行为,包括浏览、加购、下单、购买、退单、评论、收藏等,一般在企业中,都会将这些因素考虑进去。
具体的评分方法是:浏览 1分,加购 2分,下单 3分, 支付 5分,退单 -5分。
- Doris建表语句
CREATE DATABASE IF NOT EXISTS recommend_db;
CREATE TABLE IF NOT EXISTS recommend_db.als_goods_for_user (user_id INT comment '用户id',goods_nos STRING comment '推荐的商品列表'
)
UNIQUE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10
sql("replication_num" = "1");
- 推荐代码
import os
from datetime import datetime
import numpy as np
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, DoubleTypeos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python'def get_best_parameter(df):# 切分数据集training, test = df.randomSplit([0.8, 0.2], seed=88)# 使用ALS构建推荐模型# 将冷启动策略设置为“drop”,以确保不会获得NaN评估指标als = ALS(userCol='user_id', itemCol='goods_no', ratingCol='score', coldStartStrategy='drop')# 创建参数网格param_grid = ParamGridBuilder() \.addGrid(als.rank, [5, 10, 15]) \.addGrid(als.maxIter, [5, 10, 20]) \.addGrid(als.regParam, [0.01, 0.05, 0.1]) \.build()# 创建评估器evaluator = RegressionEvaluator(metricName='rmse', labelCol='score', predictionCol='prediction')# 创建交叉验证器crossval = CrossValidator(estimator=als,estimatorParamMaps=param_grid,evaluator=evaluator,numFolds=3) # 3折交叉验证# 训练模型cv_model = crossval.fit(training)# 选择最佳模型best_model = cv_model.bestModel# 评估最佳模型predictions = best_model.transform(test)rmse = evaluator.evaluate(predictions)print('最优模型的均方根误差为:' + str(rmse))# 获取最佳参数rank = best_model._java_obj.parent().getRank()maxIter = best_model._java_obj.parent().getMaxIter()regParam = best_model._java_obj.parent().getRegParam()print('最佳参数组合:')print('rank: ', rank)print('maxIter: ', maxIter)print('regParam: ', regParam)return rank, maxIter, regParamif __name__ == '__main__':# 1)创建整合Hive的SparkSession# 1- 创建SparkSession对象spark = SparkSession.builder \.appName("recommend") \.master("local[*]") \.config("spark.sql.warehouse.dir", "hdfs://up01:8020/user/hive/warehouse") \.config("hive.metastore.uris", "thrift://up01:9083") \.config("spark.sql.shuffle.partitions", 5) \.enableHiveSupport() \.getOrCreate()# 2)从业务库中计算历史评分select_sql = """select zt_id as user_id, goods_no, sum(if(trade_type in(2, 5), -1, 1)) * 5 as scorefrom dwm.dwm_sold_goods_sold_dtl_iwhere dt >= date_sub(current_date, 90) and dt <= date_sub(current_date, 1)and zt_id != 0 and zt_id is not nullgroup by zt_id, goods_no"""hive_df = spark.sql(select_sql)hive_df.show()# 3)读取并解析日志数据file_path = 'hdfs://up01:8020/xtzg/etl/dwd_user_event_etl_result/dt=2025-02-14'# 读取ORC格式的数据log_df = spark.read.format('orc').load(file_path).select('user_id', F.split('goods_name', '=')[0].alias('goods_no'), (F.col('is_browse')*1 + F.col('is_cart')*2 + F.col('is_order')*3 + F.col('is_buy')*5 - F.col('is_back_order')*5).alias('score') )log_df.show()# 4) 数据合并并聚合# 因为als模型中,需要userCol和itemCol都是整型,所以需要将类型转成int,又因为goods_no有0开头的,所以需要再前边拼接一个数字# 因为频次过多会导致评分过大,所以可以使用log将数据变平滑union_df = hive_df.unionAll(log_df).groupby('user_id', 'goods_no').agg(F.sum('score').alias('score')).\select(F.col('user_id').astype(IntegerType()).alias('user_id'), F.concat(F.lit('1'), F.col('goods_no')).astype(IntegerType()).alias('goods_no'), 'score')# union_df.printSchema()union_df.show()# 5) 训练模型并得到推荐结果# 获取最佳超参数# rank, maxIter, regParam = get_best_parameter(union_df)rank, maxIter, regParam = 15, 20, 0.1als = ALS(rank=rank, maxIter=maxIter, regParam=regParam, userCol='user_id', itemCol='goods_no', ratingCol='score',coldStartStrategy='drop')als_model: ALSModel = als.fit(union_df)# 为每个用户生成十大商品推荐userRecs = als_model.recommendForAllUsers(10)userRecs.printSchema()# userRecs.show(truncate=False)# 处理 goods_no,将int转为str,并去掉前缀1doris_df = userRecs.withColumn('goods_nos',F.expr("""TRANSFORM(recommendations, x -> named_struct('goods_no', substr(CAST(x.goods_no AS STRING), 2),'rating', x.rating))""")).select('user_id', F.col('goods_nos').astype(StringType()).alias('goods_nos'))doris_df.printSchema()doris_df.show(truncate=False)# 保存到 Dorisdoris_df.write.jdbc(url="jdbc:mysql://192.168.88.166:9030/recommend_db?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false",table="als_goods_for_user",mode="append",sql={'user': 'root', 'password': '123456'})# 释放资源spark.stop()
3、基于物品的协同过滤推荐(了解)
基于物品的协同过滤就是计算出每个标的物最相似的标的物列表,然后就可以为用户推荐用户喜欢的标的物相似的标的物。 这里可以借助ALS算法生成的矩阵来完成。
离线计算的ALS 算法,算法最终会为用户、商品分别生成最终的特征矩阵,分别是表示用户特征矩阵的U(K x U)矩阵,每个用户由 K 个特征描述;表示物品特征矩阵的V(I x K)矩阵,每个物品也由 K 个特征描述。
- Doris建表语句
CREATE TABLE IF NOT EXISTS recommend_db.als_sim_goods_list (id INT comment 'id',goods_no STRING comment '商品编码',sim_goods_list STRING comment '相似的商品列表'
)
UNIQUE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 10
sql("replication_num" = "1");
- 推荐代码:写到ALS的后面
# 6)为每个商品生成十大用户推荐
# goodsRecs = als_model.recommendForAllItems(10)# 通过 itemFactors 获得商品的特征表达
# als_model.itemFactors.show(truncate=False)
# 获取商品id及对应的特征表达
item_factors_df = als_model.itemFactors.select(F.expr("substr(cast(id as string), 2) as goods_no"), 'features')
item_factors_df.show(truncate=False)# 定义计算余弦相似度的 UDF
def consin_sim(vec1, vec2):vec1 = np.array(vec1)vec2 = np.array(vec2)num = np.dot(vec1, vec2)# np.linalg.norm()用于求范数,默认是二范数denom = np.linalg.norm(vec1) * np.linalg.norm(vec2)if denom == 0:return 0.0return round(float(num / denom), 4)consin_sim_udf = F.udf(consin_sim, DoubleType())# item_factors_df自关联,计算相似度,再将相似度小于0.75的过滤掉
cartesian_goods_df = item_factors_df.alias('df1') \.crossJoin(item_factors_df.alias('df2')) \.filter(F.col('df1.goods_no') != F.col('df2.goods_no')) \.withColumn('simScore', consin_sim_udf(F.col('df1.features'), F.col('df2.features'))) \.filter('simScore >= 0.75')# 按照 goods_no 进行分组并构建推荐结果
goods_recs_df = cartesian_goods_df.groupBy('df1.goods_no') \.agg(F.collect_list(F.struct(F.col('df2.goods_no').alias('rec_goods_no'), F.col('simScore').alias('score'))).alias('rec_goods_nos'))# 对相似的goods_no列表进行排序,并选取前10个【因为个别商品相似的商品太多,所以只保留10个即可】
# 使用 expr 和 array_sort 函数进行排序,并使用 slice 函数只保留前10个元素
sorted_df = goods_recs_df.withColumn('sim_goods_list',F.expr("slice(array_sort(rec_goods_nos, (x, y) -> case when x.score > y.score then -1 when x.score < y.score then 1 else 0 end), 1, 10)")
).withColumn('id',F.expr("cast(concat('1', goods_no) as int)")
).select('id', 'goods_no', F.col('sim_goods_list').astype(StringType()).alias('sim_goods_list'))# 显示结果
sorted_df.printSchema()
sorted_df.show(truncate=False)# 保存到 Doris
write_to_doris(sorted_df, 'recommend_db.als_sim_goods_list')
4、基于用户的协同过滤推荐(了解)
UserCF算法主要是考虑用户与用户之间的相似度,给用户推荐和他兴趣相似的其他用户喜欢的物品。俗话说"物以群分,人以类聚",人们总是倾向于跟自己志同道合的人交朋友。同理,你朋友喜欢的东西你大概率也可能会喜欢,UserCF算法正是利用了这个原理。
- Doris建表语句
CREATE TABLE IF NOT EXISTS recommend_db.user_cf_goods_for_user (user_id INT comment '用户id',goods_nos STRING comment '推荐的商品列表'
)
UNIQUE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10
sql("replication_num" = "1");
- 推荐代码
import osfrom pyspark.ml.feature import CountVectorizer, Normalizer
from pyspark.sql import DataFrame, Window, SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, ArrayType, StructType, StructField, StringType, FloatTypeos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder \.appName("recommend") \.master("local[*]") \.config("spark.sql.warehouse.dir", "hdfs://up01:8020/user/hive/warehouse") \.config("hive.metastore.uris", "thrift://up01:9083") \.config("spark.sql.shuffle.partitions", 5) \.enableHiveSupport() \.getOrCreate()# 2)从es读取标签数据并将标签数据合并es_df = spark.read.format("es") \.option("es.nodes", "192.168.88.166:9200") \.option("es.resource", "user_profile_tags") \.option("es.read.field.include", "user_id,tags_id_times,tags_id_once,tags_id_streaming") \.load()temp_df = es_df.select('user_id', F.concat(F.coalesce('tags_id_times', F.lit('')), F.lit(','),F.coalesce('tags_id_once', F.lit('')), F.lit(','),F.coalesce('tags_id_streaming', F.lit(''))).alias('tags'))temp_df.show()# 3)将标签数据转换成向量# 使用split函数将字符串切分成数组,然后使用filter将''的元素过滤掉tags_df = temp_df.select('user_id', F.split('tags', ',').alias('tags')).select('user_id', F.expr("filter(tags, x -> x != '')").alias('tags'))# tags_df.show(truncate=False)# 将标签数组转换为向量cv = CountVectorizer(inputCol='tags', outputCol='features')model = cv.fit(tags_df)user_df = model.transform(tags_df)# 因为数据量比较大,容易运行不出来,所以可以抽样,取少量数据# user_df = user_df.sample(fraction=0.01, seed=66)user_df.show(truncate=False)# 4)计算用户相似度# 标准化向量normalizer = Normalizer(inputCol="features", outputCol="norm_features")norm_df = normalizer.transform(user_df).select('user_id', 'norm_features')norm_df.printSchema()# norm_df.show(truncate=False)# 将稀疏向量列转换为稠密向量列def to_dense(vector):return vector.toArray().tolist()to_dense_udf = F.udf(to_dense, ArrayType(DoubleType()))dense_df = norm_df.withColumn('dense_features', to_dense_udf('norm_features'))# dense_df.show(truncate=False)# 计算用户之间的余弦相似度join_df = dense_df.alias('u1').join(dense_df.alias('u2'), F.col('u1.user_id') != F.col('u2.user_id')) \.select(F.col('u1.user_id').alias('user1'), F.col('u2.user_id').alias('user2'),F.col('u1.dense_features').astype(ArrayType(FloatType())).alias('f1'),F.col('u2.dense_features').astype(ArrayType(FloatType())).alias('f2'))user_sim = join_df.select('user1', 'user2', F.zip_with('f1', 'f2', lambda x, y: x * y).alias('f3')) \.withColumn('cosine_sim', F.round(F.aggregate('f3', F.lit(0.0), lambda acc, x: acc + x), 4)) \.select('user1', 'user2', 'cosine_sim')# print('-----------------------',user_sim.count(),'------------------------------')user_sim.printSchema()# user_sim.show(truncate=False)# 5)获取每个用户最相似的10个用户# 定义窗口函数windowSpec = Window.partitionBy('user1').orderBy(F.col('cosine_sim').desc())# 取rn前10的列rn_df = user_sim.withColumn('rn', F.row_number().over(windowSpec)).filter('rn <= 10')# rn_df.show(truncate=False)# 6)查询每个用户评分最高的商品# 计算评分时,因为不同用户购买频次不同会导致评分差距过大,在进行商品推荐时,该评分对结果影响很大,所以可以对score使用log函数,将这种变化变平缓些select_sql = """select user_id, goods_no, round(log(score), 3) as scorefrom (selectuser_id, goods_no, score, rank() over(partition by user_id order by score desc) as rnfrom (select zt_id as user_id, goods_no, sum(if(trade_type in(2, 5), -1, 1)) * 5 as scorefrom dwm.dwm_sold_goods_sold_dtl_iwhere dt >= date_sub(current_date, 90) and dt <= date_sub(current_date, 1)and zt_id != 0 and zt_id is not nullgroup by zt_id, goods_no) tmpwhere score > 0 -- score为0或负的不推荐) twhere rn <= 10"""hive_df = spark.sql(select_sql)# hive_df.show()# 按照 goods_no 进行分组并构建推荐结果prefer_goods_df = hive_df.groupBy('user_id') \.agg(F.collect_list(F.struct('goods_no', 'score')).alias('prefer_goods_nos'))# prefer_goods_df.show(truncate=False)# 7)用户关联商品,给用户进行推荐# 关联商品join_df = rn_df.join(prefer_goods_df, rn_df['user1'] == prefer_goods_df['user_id'], 'inner').select('user1', 'user2', 'cosine_sim', F.col('prefer_goods_nos').alias('user1_goods_no'))join_df = join_df.join(prefer_goods_df, join_df['user2'] == prefer_goods_df['user_id'], 'inner').select('user1', 'user2', 'cosine_sim', 'user1_goods_no', F.col('prefer_goods_nos').alias('user2_goods_no'))# join_df.show(truncate=False)join_df.printSchema()# 定义一个udf,将cosine_sim,user1_goods_no和user2_goods_no都传进去,去掉user2_goods_no中的user1_goods_no,并计算user2_goods_no的分数def calculate_score(cosine_sim, user1_goods_no, user2_goods_no):user1_goods = [item.goods_no for item in user1_goods_no]user2_goods = []for item in user2_goods_no:if item['goods_no'] not in user1_goods:user2_goods.append({'goods_no': item['goods_no'], 'score': round(item['score'] * cosine_sim, 3)})return user2_goods# 返回值类型schema = ArrayType(StructType([StructField('goods_no', StringType(), nullable=False),StructField('score', DoubleType(), nullable=False)]))calculate_score_udf = F.udf(calculate_score, schema)# 获取用户及推荐的商品rec_df = join_df.select(F.col('user1').alias('user_id'), calculate_score_udf('cosine_sim', 'user1_goods_no', 'user2_goods_no').alias('rec_goods')).\filter(F.size(F.col('rec_goods')) > 0)# rec_df.show(truncate=False)# 展开 rec_goods 中的元素,然后按照 user_id 进行分组并聚合成列表goods_recs_df = rec_df.withColumn('goods_no_element', F.explode(F.col('rec_goods'))).groupBy('user_id').agg(F.collect_list('goods_no_element').alias('rec_goods_nos'))# 对推荐的goods_no列表进行排序,并选取前10个【因为个别用户推荐的商品太多,所以只保留10个即可】# 使用 expr 和 array_sort 函数进行排序,并使用 slice 函数只保留前10个元素sorted_df = goods_recs_df.withColumn('rec_goods_list',F.expr('slice(array_sort(rec_goods_nos, (x, y) -> case when x.score > y.score then -1 when x.score < y.score then 1 else 0 end), 1, 10)')).select('user_id', F.col('rec_goods_list').astype(StringType()).alias('goods_nos'))# sorted_df.show(truncate=False)# 8)结果保存# 保存到 Dorissorted_df.write.jdbc(url="jdbc:mysql://192.168.88.166:9030/recommend_db?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false",table="user_cf_goods_for_user",mode="append",sql={'user': 'root', 'password': '123456'})spark.stop()
5、基于关联规则的推荐(熟悉)
5.1 关联规则详解
- 什么是关联规则(Association Rules)?
答:关联规则是数据挖掘中的概念,通过分析数据,找到数据之间的关联。电商中经常用来分析购买物品之间的相关性,例如,“购买尿布的用户,有大概率购买啤酒”,这就是一个关联规则。
- 什么是关联规则推荐(Association Rule Based Recommendaion)?
答:顾名思义,利用关联规则,来实施推荐。关联规则推荐的目标,是希望达到“将尿布放入购物车之后,再推荐啤酒”比“直接推荐啤酒”获取有更好的售卖效果。
- 关联规则推荐的典型应用?
线下,可以将尿布和啤酒放在一起;
线上,可以在用户将尿布放入购物车后,立刻推荐啤酒。
- 如何实施?
假设某电商会售卖ABCD四种商品,历史上共5笔订单,分别卖出{A,B,C}, {B,C,D}, {A,B,C,D}, {A,C}, {C}
5.2 FP-growth算法理解
常用的算法有Aprior算法和FP-growth算法,FP-growth算法比Apriori算法效率更高,并且在PySpark中对FP-growth算法进行了实现,所以这里重点讲一下FP-growth算法原理。
FP-growth(Frequent Pattern Tree, 频繁模式树),是韩家炜老师提出的挖掘频繁项集的方法,是将数据集存储在一个特定的称作FP树的结构之后发现频繁项集或频繁项对,即常在一块出现的元素项的集合FP树。
5.3 SparkMLlib中的FP-growth算法
spark.ml中提供了FPGrowth()方法来实现FP-growth算法。spark.ml的FP-growth实现接受以下(超)参数:
- minSupport:将一个项目集识别为频繁项目集的最低支持度。例如,如果一个项目在5个事务中出现了3次,它的支持度就是3/5=0.6。
- minConfidence:生成关联规则的最低置信度。置信度是表明一个关联规则被发现为真的频率。例如,如果在事务中项目集X出现了4次,而X和Y共同出现了2次,则规则X => Y的置信度为2/4=0.5。该参数不会影响频繁项目集的挖掘,但会指定从频繁项目集中生成关联规则的最低置信度。
- numPartitions:用于分配工作的分区数。默认情况下,该参数未设置,使用输入数据集的分区数。
模型训练完成后,会生成 FPGrowthModel 对象。FPGrowthModel 提供以下方法或属性:
-
freqItemsets:频繁项目集,以包含以下列的数据框格式提供:
- items:array:一个给定的项目集。
- freq:long:根据配置的模型参数,该项目集出现的次数。
-
associationRules:生成的置信度高于 minConfidence 的关联规则,以包含以下列的数据框格式提供:
- antecedent:array:作为关联规则假设的项目集。如果关联规则为A->B,则 antecedent 为 A。
- consequent:array:总是包含一个元素的项目集,代表关联规则的结论。如果关联规则为A->B,则 antecedent 为 B。
- confidence:double:置信度,定义参见上文中的 minConfidence。
- lift:double:提升度,计算方法为 support(antecedent ∪ consequent) / (support(antecedent) x support(consequent))。
- support:double:频繁项目集的支持度,定义参见上文中的 minSupport。
-
transform:根据传入的items,比对关联规则,将符合规则的结果添加到预测结果中。transform 方法将汇总所有适用规则的结果作为预测。预测列的数据类型与 items 列相同,并且不包含 items 列中的现有项目。
5.4 完整代码
- Doris建表语句
CREATE TABLE IF NOT EXISTS recommend_db.fpgrowth_association_goods
( `calculate_date` DATETIME COMMENT "计算时间",`antecedent` ARRAY<STRING> COMMENT "购买的商品",`consequent` ARRAY<STRING> COMMENT "关联(推荐)商品",`confidence` DOUBLE COMMENT "置信度",`lift` DOUBLE COMMENT "提升度",`support` DOUBLE COMMENT "支持度"
)
DUPLICATE KEY(calculate_date)
comment '关联规则推荐'
PARTITION BY RANGE(calculate_date) ()
DISTRIBUTED BY HASH(calculate_date) BUCKETS 1
sql ("dynamic_partition.create_history_partition" = "true","dynamic_partition.enable" = "true","dynamic_partition.time_unit" = "DAY","dynamic_partition.start" = "-365","dynamic_partition.end" = "3","dynamic_partition.prefix" = "p","dynamic_partition.buckets" = "10","replication_allocation" = "tag.location.default: 1"
);因为在doris中只是为了备份,所以存储成冗余模型即可。为了区分不同时间计算的结果,在表中添加了calculate_date字段,作为区分。然后为了分区存储,使用了动态分区的方式。
- 推荐代码
from pyspark.ml.fpm import FPGrowth, FPGrowthModel
from pyspark.sql import SparkSession
import os
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, StructType, StructField, ArrayType
from tags.utils.hdfs_utils import HDFSUtilos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 实现商品推荐的功能
def get_recommend_goods(current_goods_no, spark, fpg_model):schema = StructType([StructField("items",ArrayType(StringType()))])current_goods_no_df = spark.createDataFrame(data=[(current_goods_no,)],schema=schema)# 使用模型进行商品推荐result_df = fpg_model.transform(current_goods_no_df)result_df.show()result_df.printSchema()print(result_df.collect())# 返回最终的推荐商品IDreturn result_df.collect()[0][1]# 基于关联规则的推荐
if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.appName("fp_growth")\.master("local[*]") \.config("spark.sql.warehouse.dir", "hdfs://up01:8020/user/hive/warehouse") \.config("hive.metastore.uris", "thrift://up01:9083") \.config("spark.sql.shuffle.partitions",5) \.enableHiveSupport() \.getOrCreate()# 2- 数据输入:分析商品间的关联关系order_df = spark.sql("""selectorder_no,collect_set(goods_no) as items -- 将当前订单下的多个商品合到一个Set集合中from dwm.dwm_sold_goods_sold_dtl_iwhere datediff(current_date(),to_date(trade_date))<=40and goods_no is not nulland parent_order_no is not nulland order_no is not nullgroup by order_no""")# 3- 通过FP-growth分析商品间的关联关系的频率path = "/xtzg/recommend/fpg"if HDFSUtil().exists(path):# 如果之前已经训练好了模型,那么直接加载出来使用即可fpg_model = FPGrowthModel.load("hdfs://192.168.88.166:8020"+path)else:# 3.1- 创建算法模型实例对象fpGrowth = FPGrowth(itemsCol="items", minSupport=0.001, minConfidence=0.6)# 3.2- 对算法模型使用数据进行训练fpg_model = fpGrowth.fit(order_df)# 3.3- 再将训练好的模型存储到HDFSfpg_model.save("hdfs://192.168.88.166:8020"+path)rule_result = fpg_model.associationRulesrule_result.show(n=100)rule_result.printSchema()# 4- 模型训练后的商品关联信息存放到Doris中# doris_df = rule_result.withColumn("calculate_date",F.current_timestamp())doris_df = rule_result.select(F.current_timestamp().alias("calculate_date"),rule_result.antecedent.cast(StringType()).alias("antecedent"),rule_result.consequent.cast(StringType()).alias("consequent"),"confidence","lift","support")doris_df.write.jdbc(url="jdbc:mysql://192.168.88.166:9030/recommend_db?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false",table="fpgrowth_association_goods",mode="append",sql={ 'user' : 'root', 'password' : '123456' })# 5- 使用训练好的模型来进行商品的推荐:这里是模拟后面顾客来购买东西的时候,进行推荐的效果current_goods_no = ["3224064"]recommend_goods = get_recommend_goods(current_goods_no, spark, fpg_model)print(recommend_goods)# 6- 释放资源spark.stop()
可能遇到的错误:
原因: 将数据输入到Doris的array字段中的时候,需要在输入前将对应的数据格式进行类型转换为字符串
6、服务部署(了解)
配置好get_recommend_goods方法后,每次调用都需要spark session和fpg_model,如果每次调用都新建非常浪费资源,时效也会非常差。所以这里需要将get_recommend_goods方法布置成接口服务。服务启动后,则可以只实例化一个spark session和fgp_model,并实时响应查询推荐商品的请求
这里借助Flask来实现。Flask是一个用Python编写的Web应用程序框架。Flask中文官网:https://dormousehole.readthedocs.io/en/latest/
Flask安装命令
pip install Flask -i https://mirrors.aliyun.com/pypi/simple/
Flask代码
import ast
import os
from flask import Flask, request
from pyspark.ml.fpm import FPGrowthModel
from pyspark.sql import SparkSessionfrom tags.recommend.fpgrowth_association_goods import get_recommend_goodsos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python'# 初始化SparkSession
spark = SparkSession.builder \.config("spark.sql.shuffle.partitions", 5) \.appName("recommend_api") \.getOrCreate()# 加载模型
hdfs_path = "/xtzg/recommend/fpg"
fpg_model: FPGrowthModel = FPGrowthModel.load("hdfs://192.168.88.166:8020"+hdfs_path)app = Flask(__name__)@app.route('/recommend')
def recommend():# 处理get请求,获取?后边的参数data = request.args.to_dict()# print(data)# 使用literal_eval将字符串转换为listgoods_list = ast.literal_eval(data['goods_list'])print('-----数据来了:', goods_list)# 方法调用recommended_goods = get_recommend_goods(goods_list, spark, fpg_model)return recommended_goods@app.route('/')
def hello_world():return '欢迎来到小兔智购商品推荐系统'if __name__ == "__main__":app.run(host='0.0.0.0', port=5000)
启动Flask,然后可以看到生成的URL。
访问:
这里模拟发送一个get请求,在url后加?,然后是key=value,如下
http://192.168.88.166:5000/recommend?goods_list=[‘3215330’]
则可以得到响应的结果
在工作中,由后端程序调用接口,得到响应结果后,再发生给前端进行渲染,生成推荐结果。
相关文章:

day16_推荐系统和总结
文章目录 day16_推荐系统和总结一、推荐实现1、基于流行度推荐(掌握)1.1 近期热门商品推荐1.2 个人热门商品推荐 2、基于隐语义模型的协同过滤推荐(了解)2.1 ALS算法介绍2.2 推荐代码 3、基于物品的协同过滤推荐(了解&…...

一文说清楚编码、摘要、加密、公钥、私钥、解密、签名、验签
编码 对信息进行编码,没有信息损失,任何人都能通过编码方式对信息进行解码。例如 ASCII 码,base64 编码。 例如下面是 ASCII 编码表: 摘要 对信息计算摘要值,有信息损失,例如 md5 摘要,sh…...

Repeated Sequence
记suma[1]a[2]a[3]...a[n]。 该序列以a[1],a[2],a[3]....a[n]为循环节,明显的,问题可转化为:s%sum是否为该序列的某个连续子序列和。 断环为链。将a复制一份。 枚举a[i]为左端点的所有区间的和。再查找s是否存在。二分O&#x…...

CT dicom 去除床板 去除床位,检查床去除
1. 前言 医院拍摄患者CT与MRI 图像, 但是CT图像中就会出现检查床的区域,来看CT扫描设备是什么样子的,红色标出区域 可以在图中看到,在头部位置安装有固定头部的类似支架的东西,这个东西拍摄出来时什么样子呢ÿ…...

react 踩坑记 too many re-renders.
报错信息: too many re-renders. React limits the number of randers to prevent an infinite loop. 需求 tabs只有特定标签页才展示某些按钮 button要用 传递函数引用方式 ()>{} *还有要注意子组件内loading触发 导致的重复渲染...

YOLOv8与BiFormer注意力机制的融合:提升多场景目标检测性能的研究
文章目录 1. YOLOv8的改进背景2. BiFormer注意力机制的核心原理2.1 Bi-level Attention结构2.2 路由策略与加权融合 3. YOLOv8与BiFormer的结合3.1 YOLOv8架构概述3.2 BiFormer与YOLOv8的融合策略 4. 实现代码示例5. 结果分析与实验5.1 数据集与实验设置5.2 实验结果 6. 进一步…...

Ubuntu24.04LTS的下载安装超细图文教程(VMware虚拟机及正常安装)
😸个人主页👉:神兽汤姆猫 📖系列专栏:开发语言环境配置 、 Java学习 、Java面试 、Markdown等 学习上的每一次进步,均来自于平时的努力与坚持。 💕如果此篇文章对您有帮助的话,请点…...

c++贪心系列
各位小伙伴们新年好呀,这是年后的第一篇文章,那么还是一样,我们继续学习这个贪心算法。 第一题 题目链接 2418. 按身高排序 - 力扣(LeetCode) 题目解析 代码原理 方法一 1.先创建一个下标数组,将两个数…...
爬虫第七篇数据爬取及解析
这篇博客旨在分享学习过程中的心得和体会,如果有错误请指出,感谢大家。 经过前面的学习,那么我们也就进入了数据爬取的阶段,大家跟着我的步伐一起来学习一下,爬虫的数据爬取与数据解析(本篇主要针对于带有…...

LangChain 技术入门指南:探索语言模型的无限可能
在当今的技术领域,LangChain 正逐渐崭露头角,成为开发语言模型应用的强大工具。如果你渴望深入了解并掌握这一技术,那么就跟随本文一起开启 LangChain 的入门之旅吧! (后续将持续输出关于LangChain的技术文章,有兴趣的同学可以关注…...

解锁D3.js与PlantUML的交互奥秘:探索知识图谱数据可视化新领域
解锁D3.js与PlantUML的交互魔法:数据可视化新征程 在前端开发的广袤天地里,数据可视化一直是一颗璀璨的明珠,吸引着无数开发者探索其奥秘。而当D3.js这一强大的JavaScript库,遇上专注于创建UML图的PlantUML,一场奇妙的…...
OpenCV机器学习(8)随机森林(Random Forests)算法cv::ml::RTrees类
操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 cv::ml::RTrees 是 OpenCV 机器学习模块中的一部分,用于实现随机森林(Random Forests)算法。随机森林是一种集…...
Java四大框架深度剖析:MyBatis、Spring、SpringMVC与SpringBoot
目录 前言: 一、MyBatis框架 1. 概述 2. 核心特性 3. 应用场景 4. 示例代码 二、Spring框架 1. 概述 2. 核心模块 3. 应用场景 4. 示例代码 三、SpringMVC框架 1. 概述 2. 核心特性 3. 应用场景 4. 示例代码 四、SpringBoot框架 1. 概述 2. 核心…...

MySQL系列之身份鉴别(安全)
导览 前言Q:如何保障MySQL数据库身份鉴别的有效性一、有效性检查 1. 用户唯一2. 启用密码验证3. 是否存在空口令用户4. 是否启用口令复杂度校验5. 是否设置口令的有效期6. 是否限制登录失败尝试次数7. 是否设置(超过尝试次数)锁定的最小时长…...

纯手工搭建整套CI/CD流水线指南
目录 一、前言 二、环境准备 1、服务器开荒(192.168.1.200) 2、离线资源清单(提前用U盘拷好) 三、硬核安装:比拧螺丝还细的步骤 Step1:搭建GitLab(注意!这是只内存饕餮…...

侯捷 C++ 课程学习笔记:C++ 基础与演化
一、课程基础要求 在侯捷老师C 课程中,首先强调了学习 C 前应具备的基础知识。这些基础知识对于理解 C 的核心概念和编程技巧至关重要。 掌握某种过程式语言(C 语言最佳): 变量(Variables):理解…...
LangChain:AI大模型开发与分布式系统设计
文章目录 第一部分:大模型与 LangChain 基础1.1 大语言模型概述1.2 LangChain 基础 第二部分:模型初始化与调用2.1 自定义大模型架构 第三部分:高级模型设计与优化3.1 提示工程与模型调优3.2 高效处理大规模数据 第四部分:分布式系…...

AI赋能编程:PyCharm与DeepSeek的智能开发革命
在这个智能化的时代,人工智能技术正在深刻地改变着我们的工作方式,尤其是在编程领域。无论是初学者还是资深开发者,都希望借助更高效的工具和智能助手来提升生产力、优化代码质量。今天,我们将聚焦于两个强大的工具:Py…...

c++:stack与deque
1.stack使用 1.1empty 作用:判断栈中是否为空 我们看到这里s1初始化的时候是空初始化,所以用empty来判断出的就是空的栈 1.2size size的作用就是判断栈中的数据个数 1.3push 与vector,string,list不同的是,stack中没有头插尾插的概念 因为栈有一个原则&…...

Linux-C/C++《C++/1、C++基础》(C++语言特性、面向对象等)
这里主要介绍概念为主,主要介绍 C与 C 语言中常用的不同点,和一些新的变化。其中不会去说指针、数据类型、变量类型、判断和循环等这些知识,这些和C 语言基本是一样使用的。我们主要学习 C的面向对象编程,对学习 Qt 有很大的帮助。…...
Python|GIF 解析与构建(5):手搓截屏和帧率控制
目录 Python|GIF 解析与构建(5):手搓截屏和帧率控制 一、引言 二、技术实现:手搓截屏模块 2.1 核心原理 2.2 代码解析:ScreenshotData类 2.2.1 截图函数:capture_screen 三、技术实现&…...
java_网络服务相关_gateway_nacos_feign区别联系
1. spring-cloud-starter-gateway 作用:作为微服务架构的网关,统一入口,处理所有外部请求。 核心能力: 路由转发(基于路径、服务名等)过滤器(鉴权、限流、日志、Header 处理)支持负…...

23-Oracle 23 ai 区块链表(Blockchain Table)
小伙伴有没有在金融强合规的领域中遇见,必须要保持数据不可变,管理员都无法修改和留痕的要求。比如医疗的电子病历中,影像检查检验结果不可篡改行的,药品追溯过程中数据只可插入无法删除的特性需求;登录日志、修改日志…...
uni-app学习笔记二十二---使用vite.config.js全局导入常用依赖
在前面的练习中,每个页面需要使用ref,onShow等生命周期钩子函数时都需要像下面这样导入 import {onMounted, ref} from "vue" 如果不想每个页面都导入,需要使用node.js命令npm安装unplugin-auto-import npm install unplugin-au…...

渗透实战PortSwigger靶场-XSS Lab 14:大多数标签和属性被阻止
<script>标签被拦截 我们需要把全部可用的 tag 和 event 进行暴力破解 XSS cheat sheet: https://portswigger.net/web-security/cross-site-scripting/cheat-sheet 通过爆破发现body可以用 再把全部 events 放进去爆破 这些 event 全部可用 <body onres…...
在 Nginx Stream 层“改写”MQTT ngx_stream_mqtt_filter_module
1、为什么要修改 CONNECT 报文? 多租户隔离:自动为接入设备追加租户前缀,后端按 ClientID 拆分队列。零代码鉴权:将入站用户名替换为 OAuth Access-Token,后端 Broker 统一校验。灰度发布:根据 IP/地理位写…...

React19源码系列之 事件插件系统
事件类别 事件类型 定义 文档 Event Event 接口表示在 EventTarget 上出现的事件。 Event - Web API | MDN UIEvent UIEvent 接口表示简单的用户界面事件。 UIEvent - Web API | MDN KeyboardEvent KeyboardEvent 对象描述了用户与键盘的交互。 KeyboardEvent - Web…...

高等数学(下)题型笔记(八)空间解析几何与向量代数
目录 0 前言 1 向量的点乘 1.1 基本公式 1.2 例题 2 向量的叉乘 2.1 基础知识 2.2 例题 3 空间平面方程 3.1 基础知识 3.2 例题 4 空间直线方程 4.1 基础知识 4.2 例题 5 旋转曲面及其方程 5.1 基础知识 5.2 例题 6 空间曲面的法线与切平面 6.1 基础知识 6.2…...
python如何将word的doc另存为docx
将 DOCX 文件另存为 DOCX 格式(Python 实现) 在 Python 中,你可以使用 python-docx 库来操作 Word 文档。不过需要注意的是,.doc 是旧的 Word 格式,而 .docx 是新的基于 XML 的格式。python-docx 只能处理 .docx 格式…...

Springcloud:Eureka 高可用集群搭建实战(服务注册与发现的底层原理与避坑指南)
引言:为什么 Eureka 依然是存量系统的核心? 尽管 Nacos 等新注册中心崛起,但金融、电力等保守行业仍有大量系统运行在 Eureka 上。理解其高可用设计与自我保护机制,是保障分布式系统稳定的必修课。本文将手把手带你搭建生产级 Eur…...