This purpose of this project was to work with a distributed recommender system. To do this, a previously adapted recommender system was compared to a recommender system created with Apache Spark. Things that were taken into consideration were the efficiency of the systems as well as Spark’s complexity. This assignment was completed using sparklyr in R.

Data input

The MovieLens small dataset was used (retrieved from the https://grouplens.org/datasets/movielens/ website) with 100,000 ratings, 1,300 tags for 9,000 movies and 700 users. This is the most updated dataset (10/2016) with the most manageable size for R.

Loading libraries/dataset

library(dplyr)
library(tidyr)
library(recommenderlab)
library(sparklyr)
library(knitr)

The MovieLens matrix was created combining two datasets (movies and ratings). The movies dataset included the movieId, the title and the genres, while the ratings dataset included the userId, the movieId, the movie rating and a timestamp.

movies = read.csv("https://raw.githubusercontent.com/Galanopoulog/DATA643/master/Project%202/movies.csv", 
                   header = TRUE, sep = ",", stringsAsFactors = FALSE)
ratings = read.csv("https://raw.githubusercontent.com/Galanopoulog/DATA643/master/Project%202/ratings.csv", 
                    header = TRUE, sep =",", stringsAsFactors = FALSE)

kable(head(movies))
movieId title genres
1 Toy Story (1995) Adventure|Animation|Children|Comedy|Fantasy
2 Jumanji (1995) Adventure|Children|Fantasy
3 Grumpier Old Men (1995) Comedy|Romance
4 Waiting to Exhale (1995) Comedy|Drama|Romance
5 Father of the Bride Part II (1995) Comedy
6 Heat (1995) Action|Crime|Thriller
kable(head(ratings))
userId movieId rating timestamp
1 31 2.5 1260759144
1 1029 3.0 1260759179
1 1061 3.0 1260759182
1 1129 2.0 1260759185
1 1172 4.0 1260759205
1 1263 2.0 1260759151

Original Rec Sys

For the original recommender system, the two datasets were combined, made into a realRatingMatrix and the minimum number of ratings were taken for future evaluation. The matrix that was created retained 88,000 of the previously 100,000 ratings. Since determining the efficiency of Spark is one of the main goals of this assignment, ensuring the datasets were large enough to display any differences while keeping in mind R’s handling capacity was one of the points taken into consideration.

movRate = merge(movies, ratings, by = "movieId")
new = subset(movRate, select = c("title", "userId", "rating"))

data = as(new, "realRatingMatrix")
data = data[rowCounts(data) > 5, colCounts(data) > 5]
data
## 3099 x 671 rating matrix of class 'realRatingMatrix' with 88088 ratings.
set.seed(100)
minimum = min(rowCounts(data))
print(paste0("Minimum number of ratings: ", min(rowCounts(data))))
## [1] "Minimum number of ratings: 6"

After splitting the data into training and test sets, the recommendation system to be created was the “ALS” algorithm. Though recommenderlab has several algorithms, including IBCF and UBCF, Spark’s collaborative filtering method is through ALS matrix factorization. As such, for a more fair comparison, the ALS method was used.

evaluation = evaluationScheme(data = data, method = "cross-validation", k = 10, given = 5, goodRating = 3.5)

evaluation
## Evaluation scheme with 5 items given
## Method: 'cross-validation' with 10 run(s).
## Good ratings: >=3.500000
## Data set: 3099 x 671 rating matrix of class 'realRatingMatrix' with 88088 ratings.
ev_train = getData(evaluation, "train")
ev_known = getData(evaluation, "known")
ev_unknown = getData(evaluation, "unknown")

# ALS
start1 = Sys.time()

als_model = Recommender(data = ev_train, method = "ALS")
als_model_pred = predict(object = als_model, newdata = ev_known, n = 10, type = "ratings")
final1 = Sys.time() - start1

To compare efficiency, the time it took for R to process the recommender system was calculated in seconds.

print(paste0("Recommender System Computation Time: ", round(final1, 4), " seconds"))
## [1] "Recommender System Computation Time: 1.0056 seconds"

SPARK Rec Sys

Version 2.3.0 of Spark was installed and used for the creation of the recommender system. A connection to the server was made and the dataframe was copied there.

connect = spark_connect(master = "local")
## * Using Spark: 2.3.0

One thing to note was that, for some reason, the dataframe had to only contain numerical values. The Spark server would not accept character values, so the titles of the movies had to be substituted with the movieId.

sparkData = subset(movRate, select = c("movieId", "userId", "rating"))
sparkMovies = copy_to(connect, sparkData, overwrite = TRUE)
sparkMovies
## # Source:   table<sparkData> [?? x 3]
## # Database: spark_connection
##    movieId userId rating
##      <int>  <int>  <dbl>
##  1       1     23    3  
##  2       1    623    4.5
##  3       1    559    4  
##  4       1    306    3  
##  5       1    361    3  
##  6       1    357    5  
##  7       1    321    3  
##  8       1    560    4.5
##  9       1    531    3  
## 10       1     73    5  
## # ... with more rows

The recommender system was created, but I faced some difficulty with generating predictions, which later hindered my ability to calculate accuracy values (RMSE, MSE and MAE) for comparison purposes. So far, the main theory revolves around the retention of ratings. Because, unlike the original recommender system, there was no method to ensure “minimum ratings” in Spark, it may be that the predictions were returned as Null, hence creating an error. For time purposes, this will have to be looked at in the future.

start2 = Sys.time()

sparkModel = ml_als_factorization(sparkMovies, rating.column = "rating", user.column = "userId", item.column = "movieId", iter.max = 5)

#sparkPreds = sparkModel$.model %>%
#  invoke("transform", sparkMovies)) %>%
#  collect()

final2 = Sys.time() - start2
print(paste0("SPARK Recommender System Computation Time: ", round(final2, 4), " seconds"))
## [1] "SPARK Recommender System Computation Time: 6.6535 seconds"

Comparison

Because the ALS function for recommenderlab was significantly slower that other algorithms (something I noticed from past experiences), the Item-Based and User-Based Collaborative Filters were also created and timed.

# ALS
ALS = calcPredictionAccuracy(x = als_model_pred, data = ev_unknown, byUser = FALSE)

# Spark ALS
Spark_ALS = cbind(0, 0, 0)

# Item
start3 = Sys.time()
item_model = Recommender(data = ev_train, method = "IBCF")
item_model_pred = predict(object = item_model, newdata = ev_known, n = 10, type = "ratings")
final3 = Sys.time() - start3
IBCF = calcPredictionAccuracy(x = item_model_pred, data = ev_unknown, byUser = FALSE)

# User
start4 = Sys.time()
user_model = Recommender(data = ev_train, method = "UBCF")
user_model_pred = predict(object = user_model, newdata = ev_known, n = 10, type = "ratings")
final4 = Sys.time() - start4
UBCF = calcPredictionAccuracy(x = user_model_pred, data = ev_unknown, byUser = FALSE)

The comparison table below shows the accuracy values of the algorithms (with the exception of Spark’s) and the time it took to compute each recommender system.

tables = (cbind(rbind(IBCF, UBCF, ALS, Spark_ALS), rbind(final3, final4, final1, final2)))
colnames(tables) = c("RMSE", "MSE", "MAE", "Time")
rownames(tables) = c("IBCF", "UBCF", "ALS", "Spark_ALS")
kable(round(tables,3))
RMSE MSE MAE Time
IBCF 1.626 2.643 1.227 5.799
UBCF 1.051 1.105 0.816 2.266
ALS 0.990 0.980 0.775 1.006
Spark_ALS 0.000 0.000 0.000 6.654
spark_disconnect(connect)

Conclusion

Though conclusions can’t be drawn based on the performance of Spark’s recommender system, as far as the time taken to create a recommendation system in Spark, the numbers don’t appear favorable for Spark. If this is truly the case and not a matter of human error, it may be better to use Spark on top of a different platform. Despite the conclusions drawn from the times elapsed, Spark is probably better when handling large datasets of a million or more ratings/entries. This is said mostly due to personal opinion drawn from waiting for the ALS method to finish in Rmarkdown.

One of the things that somewhat disheartened me was Spark’s lack of variety in terms of algorithms. Where recommenderlab had several to pick from, it appeared as though Spark relied primarily on ALS matrix factorization. Also, since it was my first time using Apache Spark, handling errors may have appeared more difficult than usual, however, the platform itself (in R, at least), is probably not as complicated as I currently perceive it to be.