互联网的飞速发展使得现代社会处于一个信息爆炸的时代,面对海量的数据与信息,如何筛选和过滤成为了一个具有挑战性的问题。近年来,推荐系统被证明是一种解决信息过载问题的有效工具,从根本上讲,推荐系统是通过为用户指引该用户不熟悉的新物品来解决信息过载现象的。了解用户对物品的偏好并精准预测用户喜欢的物品是构建推荐系统中比较困难的部分,协同过滤是解决上述问题的一种方法。

本文会基于Spark构建一个简单的推荐系统,使用的是基于物品的协同过滤算法。值得注意的是,本文不会去详细介绍各种推荐算法的基本原理和数学推导,也不会去用SparkML提供的交替最小二乘的隐语义模型推荐算法库进行实现,更不会涉及算法的相关改进,比如数据稀疏性、冷启动等等。本文旨在构建一个简易的电影推荐系统,依此来说明经典的推荐系统基本原理。除此之外,你还可以通过本文熟悉SparkRDD的基本操作。希望本文对你所有帮助,记得分享、点赞、在看。以下是全文。

基于物品的协同过滤算法原理

基于物品的协同过滤算法主要有两个步骤:

  • 计算物品之间的相似度
  • 根据物品的相似度和用户的历史行为给用户生成推荐列表

其中计算物品之间的相似度可以使用欧几里得距离,皮尔逊系数、Jaccard公式、余弦相似度等等。本文使用的是余弦相似度。关于具体的公式,可以自行搜索。下图是基于物品的推荐算法与基于用户的推荐算法示意图:

即给定的评分矩阵,横向计算(某用户对所有有物品的评分向量)可以得到用户与用户之间的相似度,纵向计算(所有用户对该物品的评分向量)可以得出物品之间的相似度。

数据集准备

选择的数据集为MovieLens的电影评分数据,主要包括两个文件:u.datau.item,其中u.data是用户的评分数据,数据格式为[userId movieId rating timestamp],u.item为电影的基本信息,包括电影id,电影名称,上映时间,数据格式为:[movieId|mobieName|time|…]。

代码实现

所有步骤均已在代码中进行了详细解释,在此不再赘述,代码如下:

object MovieRecommendations {

val spark = new SparkContext("local[*]", "Movie recommendations")

/**
* 读取用户评分表,对数据进行格式转换
* 原始数据格式为:[userId,movieId,rating,timestamp]
* 转换后的数据格式为:[(userId,(movieId,rating))]
*
* @return
*/
def mapUserIdAndMovieRatings(): RDD[(Int, (Int, Double))] = {
val dataFile: RDD[String] = spark.textFile("E:/u.data")

val userIdMappedWithMovieIdAndRating: RDD[(Int, (Int, Double))] = dataFile.map(line => {
val fields = line.split("\\s+") //按照空白字符进行切割
(fields(0).toInt, (fields(1).toInt, fields(2).toDouble))
})
userIdMappedWithMovieIdAndRating
}

/**
* 当对(userId,(movieId,rating))这种格式的数据进行自连接时,
* 会得到某一个用户的所有评分数据,比如:用户1分别对电影1,2进行了评分,
* (1,(1,2)) (1,(1,2))
* (1,(2,4)) JOIN (1,(2,4))
* 则JOIN之后的数据为:
* (1,((1,2),(1,2)))
* (1,((1,2),(2,4)))
* (1,((2,4),(1,2)))
* (1,((2,4),(2,4)))
* JOIN之后的数据会存在重复,
* 去重之后的结果为:(1,((1,2),(2,4)))
* 这样就得到了形如:userId-> ((movieId,rating),(movieId,rating))的数据
* 该数据格式表示某一用户对电影评分的两两组合
*
* @param userIdAndPairOfMovies
* @return
*/
def filterDuplicateMovieData(userIdAndPairOfMovies: (Int, ((Int, Double), (Int, Double)))): Boolean = {
val movieId1: Int = userIdAndPairOfMovies._2._1._1
val movieId2: Int = userIdAndPairOfMovies._2._2._1
movieId1 < movieId2
}

/**
* 当我们拿到形如userId-> ((movieId,rating),(movieId,rating))的数据时,
* 接下来需要计算电影之间的余弦相似度,所以需要将数据加工为形如:
* (MovieID1,MovieId2) -> (Rating1,Rating2)的格式
* 这样,就得到了两两电影组合及其对应的评分数据
* 也就拿到了一个评分矩阵,即所有用户对某一部电影的评分,构成了该电影的一个向量
* ----------------------------------------
* userId | movie1 | movie2| ... | movie3
* ----------------------------------------
* user1 | 2 | | ... | 5
* ----------------------------------------
* user2 | 3 | 5 | ... |
* ----------------------------------------
* user3 | 4 | 2 | ... | 3
* ----------------------------------------
*
* @param userIdAndMovieData
* @return
*/
def mapMoviePairsWithRatings(userIdAndMovieData: (Int, ((Int, Double), (Int, Double)))): ((Int, Int), (Double, Double)) = {
val movieId1 = userIdAndMovieData._2._1._1
val movieId2 = userIdAndMovieData._2._2._1

val rating1 = userIdAndMovieData._2._1._2
val rating2 = userIdAndMovieData._2._2._2

((movieId1, movieId2), (rating1, rating2))
}

/**
* 计算电影之间的余弦相似度
* 当我们得到了形如(MovieID1,MovieId2) -> (Rating1,Rating2)格式的数据
* 再按照(MovieID1,MovieId2)进行分组,这样,相同组合的电影的评分就会分到一个组中,
* 从而就得到了每个电影的评分对应的列向量
* 比如对于[movie1,movie2],所有用户对该组合的评分为:
*
* --------------------------
* userId | movie1 | movie2
* --------------------------
* user1 | 2 | 3
* --------------------------
* user2 | 3 | 5
* --------------------------
* user3 | 4 | 2
* --------------------------
*
* 即movie1对应的评分向量为:(2,3,4)
* movie2对应的评分向量为:(3,5,2)
* 则这两部电影的余弦相似度为:
* (2*3 + 3*5 + 4*2)/ Math.sqrt(2² + 3² + 4²) * Math.sqrt(3² + 5² + 2²)
* 返回的形式为[余弦相似度,评分个数]
* @param ratingPairs
* @return
*/
def computeCosineSimilarity(ratingPairs: Iterable[(Double, Double)]): (Double, Int) = {
var numOfPairs: Int = 0
var sumXX: Double = 0.0
var sumYY: Double = 0.0
var sumXY: Double = 0.0

for (ratingPair: (Double, Double) <- ratingPairs) {
val ratingX: Double = ratingPair._1
val ratingY: Double = ratingPair._2

sumXX += ratingX * ratingX
sumYY += ratingY * ratingY
sumXY += ratingX * ratingY
numOfPairs += 1
}

val numerator: Double = sumXY
val denominator: Double = Math.sqrt(sumXX) * Math.sqrt(sumYY)
val result: Double = numerator / denominator
(result, numOfPairs)
}

/**
* 读取电影信息表,匹配电影id对应的电影名称
*
* @return
*/
def mapMovieIdAndName(): Map[Int, String] = {

implicit val codec = Codec("UTF-8")
codec.onMalformedInput(CodingErrorAction.REPLACE)
codec.onUnmappableCharacter(CodingErrorAction.REPLACE)

val idAndNameMapped: Map[Int, String] = Source.fromFile("E:/u.item").getLines().map(line => {
val lineArr = line.split('|')
(lineArr.head.toInt, lineArr(1)) // 电影id和电影名称之间的映射
}).toMap[Int, String]

idAndNameMapped
}

/**
* 打印电影推荐列表
*
* @param moviesAndSimilarityScore 任意两部电影组合的余弦相似度
* @param args 传入参数,电影id
*/
def suggestTop10Movies(moviesAndSimilarityScore: RDD[((Int, Int), (Double, Int))], args: Array[String]): Unit = {
println("为您推荐以下电影列表: ")
// 余弦相似度的阈值,此处设为0.95
val scoreThreshold: Double = 0.95
// 至少要有大于等于100个的用户对该电影组合进行了评分
val coOccurenceThreshold: Double = 50.0

// 获取传入的电影id
val movieId: Int = args.head.toInt
// 找到与传入参数的电影比较相似的电影,并排除该电影本身
val moviePairsFilteredAccordingToThreshold: RDD[((Int, Int), (Double, Int))] = moviesAndSimilarityScore.filter((moviePairAndScore: ((Int, Int), (Double, Int))) => {
val moviePair: (Int, Int) = moviePairAndScore._1
val ratingAndNumOfPairs: (Double, Int) = moviePairAndScore._2
// 该movieId属于该电影组合,并且余弦相似度大于0.98,超过100个用户对该电影组合进行了评分
(moviePair._1 == movieId || moviePair._2 == movieId) && ratingAndNumOfPairs._1 > scoreThreshold && ratingAndNumOfPairs._2 > coOccurenceThreshold
})
// 取出前10部推荐的电影
// 即找到与该电影与相似度最为接近的电影
val first10MoviesAndTheirScores: Array[((Int, Int), (Double, Int))] = moviePairsFilteredAccordingToThreshold.take(10)

val idAndMovieNames = mapMovieIdAndName()
// 根据key 获取value值
println("与 " + idAndMovieNames(movieId) + " 比较相近的电影为:")
first10MoviesAndTheirScores.foreach(moviePairAndScore => {
val movie1: Int = moviePairAndScore._1._1
val movie2: Int = moviePairAndScore._1._2
var suggestedMovie: Int = movie2
if (movie2 == movieId) {
suggestedMovie = movie1
}

println(idAndMovieNames(suggestedMovie))
})
}

def main(args: Array[String]): Unit = {

//设置日志级别
Logger.getLogger("org").setLevel(Level.ERROR)

// 调用mapUserIdAndMovieRatings
// 原始数据格式为:[userId,movieId,rating,timestamp]
// 转换后的数据格式为:[(userId,(movieId,rating))]
val userIdMappedWithMovieIdAndRatings: RDD[(Int, (Int, Double))] = mapUserIdAndMovieRatings()

/**
* 打印该RDD数据,便于观察
* (196,(242,3.0))
* (186,(302,3.0))
* (22,(377,1.0))
*/
userIdMappedWithMovieIdAndRatings.take(3).foreach(println(_))


// 对于某一个用户,找到该用户对应的电影评分,并进行两两组合
// 输出的格式为:[(userId, ((movie1, rating1), (movie2, rating2)))]
val pairOfMoviesWatchedBySameUser: RDD[(Int, ((Int, Double), (Int, Double)))] = userIdMappedWithMovieIdAndRatings.join(userIdMappedWithMovieIdAndRatings)

/**
* 打印该RDD数据,便于观察数据
* (778,((94,2.0),(94,2.0)))
* (778,((94,2.0),(78,1.0)))
* (778,((94,2.0),(7,4.0)))
*/
pairOfMoviesWatchedBySameUser.take(3).foreach(println(_))


// 对于上面的join操作,会出现重复数据,我们只需要无重复的两两组合
val pairOfMoviesWithoutDuplicates: RDD[(Int, ((Int, Double), (Int, Double)))] = pairOfMoviesWatchedBySameUser.filter(filterDuplicateMovieData)

/**
* 打印该RDD数据,便于观察数据
* (778,((94,2.0),(1273,3.0)))
* (778,((94,2.0),(265,4.0)))
* (778,((94,2.0),(239,4.0)))
*/
pairOfMoviesWithoutDuplicates.take(3).foreach(println(_))

// 对上述的RDD进行map操作,加工成形如(movie1,movie2) => (rating1,rating2)
val moviePairAndRatings: RDD[((Int, Int), (Double, Double))] = pairOfMoviesWithoutDuplicates.map(mapMoviePairsWithRatings)

/**
* 打印该RDD数据,便于观察数据
* ((94,1273),(2.0,3.0))
* ((94,265),(2.0,4.0))
* ((94,239),(2.0,4.0))
*/
moviePairAndRatings.take(3).foreach(println(_))

//Combining all the same movie sets with their ratings
// 对上述的RDD按照key进行分组,这样相同的key,
// 即相同组合的电影对应的评分就构成了电影的评分矩阵
val groupOfRatingPairsForSameMoviePair: RDD[((Int, Int), Iterable[(Double, Double)])] = moviePairAndRatings.groupByKey()

/**
* 打印该RDD数据,便于观察数据
* ((220,977),CompactBuffer((1.0,2.0), (3.0,4.0), (2.0,3.0), (5.0,3.0), (2.0,1.0), (4.0,1.0)))
* 评分矩阵的形式为:
* |movie220|movie977|
* -------------------
* |1.0 |2.0 |
* |3.0 |4.0 |
* |2.0 |3.0 |
* |5.0 |3.0 |
* |2.0 |1.0 |
* |4.0 |1.0 |
*/
groupOfRatingPairsForSameMoviePair.take(1).foreach(println(_))

//根据上面的评分矩阵,计算两两电影组合的评分矩阵
// 注:mapValues算子仅作用在value上,不会改变key的值,依然返回(key,value)的形式
val moviePairsAndSimilarityScore: RDD[((Int, Int), (Double, Int))] = groupOfRatingPairsForSameMoviePair.mapValues(computeCosineSimilarity)

/**
* 打印该RDD数据,便于观察数据
* ((220,977),(0.9082259841451907,9))
* ((876,977),(0.9701425001453319,5))
* ((742,758),(0.8905344810811052,13))
*/
moviePairsAndSimilarityScore.take(3).foreach(println(_))


//根据余弦相似度计算相似的电影,打印电影推荐列表
// args为传入的电影id参数
if (args.length > 0)

suggestTop10Movies(moviePairsAndSimilarityScore, args)

else
println("请输入电影id")

}
}

总结

本文使用基于物品的协同过滤算法,在MovieLens数据集上通过SparkRDD实现了一个简单的推荐系统,并给出了详细的代码示例,你可以对此进行调试,一方面可以了解经典的推荐系统基本原理,另一方面也可以对SparkRDD编程有更加深刻的认识。

公众号『大数据技术与数仓』,回复『资料』领取大数据资料包