机器学习-电影推荐

by LINKTIME CLOUD 2021-05-25

本项目展示了如何处理一个包含用户对不同电影的评分的数据集,针对数据集中的用户对某电影评分进行预测,并对于用户可能感兴趣的电影进行推荐。

数据集数据来源:GroupLens 官方网站,数据集来源:movielens_link。文件包含2017年7月或之前发行的电影数据4.5万条,包含来自27万位用户对于4.5万部电影的2600万个评分,评分范围为0-5。

备注:因此Demo项目设计方案的调整,录屏列表视图中出现的HDFS到Hive步骤已删除,但不影响此Demo的其他步骤及其运行结果。

本介绍以视频的形式,向大家展示一个机器学习通过ALS模型进行电影个性化推荐的实例,过程包括数据采集、数据处理和数据分析这几个步骤。

完整步骤内容文档下载

步骤一:新建个人/机构项目

用户点击界面上创建新项目,填写名称可参照下图、

1
1

步骤二:添加项目步骤

从当前项目步骤中进行添加,点击各个目录中的具体操作,依照该方法,分别添加以下几个步骤:

1.数据采集–URL文件导入

2.数据分析–jupyterNotebook

3

添加完成后,对步骤进行修改名称,以区分,可以参照下图进行修改。

2

步骤三:URL 文件导入

在BDOS Online大数据平台,用户可通过URL文件导入,导入到系统的HDFS。

电影名称数据集Web下载路径: http://linktime-public.oss-cn-qingdao.aliyuncs.com/Project_online/Jupyter/movies_metadata.csv

电影评分数据集Web下载路径: http://linktime-public.oss-cn-qingdao.aliyuncs.com/Project_online/Jupyter/movie_ratings_demo.csv

文件名称:xxx.csv(用户选填,不填则默认为url链接包含的文件名,用户也可参照示例进行名称自定义并带上文件后缀)

HDFS目录选择:保持默认

3

步骤四:对数据进行处理和导出至Hive

进入JupyterLab,新建PySpark notebook,并在PySpark程序步骤对电影测评demo数据进行处理并导出至Hive。

步骤一操作

注:

机构项(xxx/xxx为org/xxx) 1.替换org/xxx的xxx为机构名称 2.若URL文件导入步骤填入自定义名称,则替换csv为实际csv表名,否则不用修改csv表名

个人项(xxx/xxx为user/xxx) 1.替换user/xxx的xxx为当前登录用户名 2.若URL文件导入步骤填入自定义名称,则替换csv为实际csv表名,否则不用修改csv表名

data=spark.read.csv('hdfs://default/user/beta/movie_ratings_demo1108.csv',header=True)
name=spark.read.csv('hdfs://default/user/beta/movies_metadata1108.csv', header=True)
name=name.select("id","title")
for col in data.columns:
  data = data.withColumnRenamed(col, col.lower())
for col in name.columns:
  name = name.withColumnRenamed(col, col.lower())
data.show(1)
name.show(1)
步骤一说明

1.以PySpark的格式读取导入到Hive目标表的实验数据到PySpark dataframe

2.使用function data.show() 将dataframe 的内容进行展示。如:show(1),即展示数据集第1行,不填则默认展示前20行。

步骤二操作
data = data.join(name, data.movieid == name.id)
data=data.select("userid","movieid","rating","timestamp","title")
data.show(1)
步骤二说明

将电影评分数据集与电影名称数据集进行关联

步骤三操作
data=data.distinct()
data = data.dropDuplicates(subset=[c for c in data.columns if c in ["userid","movieid"]])
data.count()
步骤三说明

1.筛选重复数据,如:使用function data.distinct() 对完全相同的行进行去重。

2.进一步筛选重复数据,如:在 userId和movieId相同的情况下,只允许一条评分存在。

3.数据统计,如:使用function data.count() 统计数据集行数。

步骤四操作
import pyspark.sql.functions as f
data.agg(*[(1-(f.count(c) /f.count('*'))).alias(c+'_missing') for c in data.columns]).show()
data=data.na.drop(subset=['rating'])
data=data.dropna(thresh=2)
步骤四说明

1.打印每列数据的空缺比。

2.删除Class项结果项空缺的数据,如:(data.na.drop(subset=[‘rating’])),对“rating”项缺失的行进行删除。

3.删除非Class项空缺数>=2的行数据,如对 thresh 进行参数设置来控制判断空缺项的阈值。

步骤五操作
data=data.withColumnRenamed('userid', 'user')
data=data.withColumnRenamed('movieid', 'movie')
data
步骤五说明

对列进行重命名,如:使用function withColumnRenamed()对列进行自定义的命名。

步骤六操作
data= data.withColumn("user",data['user'].cast('int'))
data= data.withColumn("rating",data['rating'].cast('int'))
data= data.withColumn("movie",data["movie"].cast('int'))
步骤六说明

1.对数据类型进行转换,如:使用 function withColumn() 对数据类型进行转换,将string类型数据转换成double类型数据。

步骤七操作
from pyspark.sql.functions import col
data.groupBy("rating").count().orderBy(col("count").desc()).show(truncate=False)
data=data.filter(( data.rating== 0)|( data.rating== 1)|(data.rating == 2) | (data.rating == 3) | (data.rating == 4)|(data.rating == 5))
步骤七说明

筛选满足打分要求的结果项,即通过 function filter()筛选 rating 项为0或1的数据。

步骤八操作

注:

机构项目(xxx.xxx为org_xxx)

1.替换org_xxx的xxx为机构名称 2.table1为Hive目标表名,用户可自定义目标表名。

个人项目(xxx_xxx为user_xxx)

1.替换user_xxx的xxx为当前登录用户名 2.table1为Hive目标表名,用户可自定义目标表名。

data.write.format("hive").mode("overwrite").saveAsTable("xxx_xxx.table1")
print('success')

步骤八说明

将结果数据导出到目标Hive库表

具体操作可以参照下图。

4
image-20211108153143748

步骤五:ALS模型预测与调参

再次在JupyterLab中新建PySpark notebook,在PySpark程序中进行ALS模型预测与调参。

步骤一操作

注: 机构项目(xxx_xxx为org_xxx)

1.替换org_xxx的xxx为机构名称 2.table2为实际Hive目标表名

个人项(xxx_xxx为user_xxx) 1.替换user_xxx的xxx为当前登录用户名 2.table2为实际Hive目标表名

df=spark.sql("select * from xxx_xxx.table1")
(train, test) = df.randomSplit([0.8, 0.2],seed = 11)
print('success')
步骤一说明

导入上一个步骤的输出到Jupyter,并将数据分成测试集和训练集。

步骤二操作
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
als= ALS(userCol="user",itemCol="movie",ratingCol="rating",nonnegative=True,implicitPrefs=False, coldStartStrategy="drop",maxIter=10)
als_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="rating",metricName="rmse")
param_grid = ParamGridBuilder() \
            .addGrid(als.regParam, [ .01]) \
            .build()
#, .05, .1, .15
tvs = TrainValidationSplit(estimator=als,
                           estimatorParamMaps=param_grid,
                           evaluator=als_evaluator,
                           trainRatio = 0.8)
model=tvs.fit(train)
model=model.bestModel
als_prediction=model.transform(test)
evaluator = RegressionEvaluator(metricName="mae", labelCol="rating",
                                predictionCol="prediction")
​mae = evaluator.evaluate(als_prediction)
print("mean absolute error = " + str(mae))
步骤二说明

ALS模型的调参与预测

  • 运用TrainValidationSplit(TVS)来进行参数调优。
  • 运用param_grid方程来定义TVS检测的参数。
步骤三操作

注:

机构项目: 1.”hdfs:///xxx/xxx/data/mllib/alsmodel” 为hdfs:///org/xxx/data/mllib/alsmodel,xxx替换为当前机构名。 2.”xxx_xxx.table2″为org_xxx,xxx替换为当前机构名,table2为Hive目标表名,用户可自定义目标表名。

个人项目: 1.”hdfs:///xxx/xxx/data/mllib/alsmodel” 为hdfs:///user/xxx/data/mllib/alsmodel,xxx替换为当前登录用户名。 2.”xxx_xxx.table2″为user_xxx,xxx替换为当前登录用户名,table2为Hive目标表名,用户可自定义目标表名。

model.write().overwrite().save("hdfs:///xxx/xxx/data/mllib/alsmodel") 
als_prediction.write.saveAsTable("xxx_xxx.table2", format="orc", mode="overwrite")
print('success')

步骤三说明

将训练完成的最佳模型存入HDFS以便后续使用。

将最佳预测结果存入Hive目标表中。

具体操作可以参照下图。

6
7

步骤六:电影个性化推荐

再次进入JupyterLab中新建PySpark notebook,在PySpark程序中进行电影个性化推荐。

步骤一操作

注:

机构项目: 1.”hdfs:///xxx/xxx/data/mllib/alsmodel” 为hdfs:///org/xxx/data/mllib/alsmodel,xxx替换为当前机构名。 2.”xxx_xxx.table1″为org_xxx,xxx替换为当前机构名,table1为Hive目标表名,用户可自定义目标表名。 3.”xxx/xxx/movies_metadata.csv'”为org/xxx,xxx替换为当前机构名,movies_metadata.csv替换为用户自定义的hfds表名。

个人项目: 1.”hdfs:///xxx/xxx/data/mllib/alsmodel” 为hdfs:///user/xxx/data/mllib/alsmodel,xxx替换为当前登录用户名。 2.”xxx_xxx.table1″为user_xxx,xxx替换为当前登录用户名,table1为Hive目标表名,用户可自定义目标表名。 3.”xxx/xxx/movies_metadata.csv'”为user/xxx,xxx替换为当前登录用户名,movies_metadata.csv替换为用户自定义的hfds表名。

from pyspark.ml.recommendation import ALSModel
model_path = "hdfs:///xxx/xxx/data/mllib/alsmodel"
als_model =ALSModel.load(model_path)
df=spark.sql("select * from xxx_xxx.table1")
name=spark.read.csv('hdfs://default/xxx/xxx/movies_metadata.csv', header=True)
print('success')
步骤一说明

导入上一个步骤的输出数据到Jupyter。

步骤二操作
from pyspark.sql.functions import rand
df.orderBy(rand()).limit(1).select("user").show()
步骤二说明

从数据集中随机选择一个用户id,以便于后续进行检测。在limit()的括号中填入选择的数量。

步骤三操作
recommend=als_model.recommendForAllUsers(10)
recommend.select("recommendations").show(1)
def recommendforuser(userid):
res=recommend.filter(recommend.user==userid)
res=res.select("recommendations")
res=res.take(1)[0]["recommendations"]
tname=[]
for i in res:
temp=name.filter(name.id==i.movie)
n=temp.select("title").take(1)[0]["title"]
tname.append(n)
return tname
print('success')
步骤三说明 - 运用模型对用户进行推荐

使用function recommendForAllUsers()方程,在括号中填入每个用户需要推荐的电影数量。 使用recommendforuser()这个方程,将用户id对应推荐的电影id转化为电影名称,使推荐更直观。

步骤四操作
userid=21260
recommendforuser(userid)
步骤四说明

推荐实例,在function recommendforuser()括号中填入用户id,则会得到一个包含十部电影名称的队列。如示例中随机填入userid“21260”,会返回对应的推荐队列。

步骤五操作

注:

机构项目:

“xxx_xxx.table3″为org_xxx,xxx替换为当前机构名,table3为Hive目标表名,用户可自定义目标表名。

个人项目: “xxx_xxx.table3″为user_xxx,xxx替换为当前登录用户名,table3为Hive目标表名,用户可自定义目标表名。

recommend.write.format("hive").mode("overwrite").saveAsTable("xxx_xxx.table3")
print('success')
步骤五说明

将最佳预测结果存入目标 Hive 表中。

具体操作可参照下图。

8
9

备注:建议完成整个步骤后将该项工程暂停,可参考下图。

欢迎访问网站,注册体验BDOS Online,网站地址:https://bo.linktimecloud.com/

此图像的alt属性为空;文件名为landing.png

留言

评论

${{item['author_name']}} 回复 ${{idToContentMap[item.parent] !== undefined ? idToContentMap[item.parent]['author_name'] : ''}} · ${{item.date.slice(0, 10)}} 回复

暂时还没有一条评论.