Data 612 Project 5 | Implementing a Recommender System on Spark

Assignment Instructions

Adapt one of your recommendation systems to work with Apache Spark and compare the performance with your previous iteration. Consider the efficiency of the system and the added complexity of using Spark.

Please include in your conclusion: For your given recommender system’s data, algorithm(s), and (envisioned) implementation, at what point would you see moving to a distributed platform such as Spark becoming necessary?

Introduction

For this assignment, we will create 2 ALS recommender systems based on the MovieLense dataset. One system will be created using the recommenderlab package (the non-distributed system), and the other will be integrated with Spark using the sparklyr package (the distributed system).

My goal is to then compare the performance of the 2 systems to assess the advantages that distributed systems hold over non-distributed systems.

Data

Here, we load the version of the MovieLense dataset that comes with the recommenderlab package. The dataset contains 99,392 ratings from 943 users on 1664 movies.

set.seed(150)
data(MovieLense)
show(MovieLense)
## 943 x 1664 rating matrix of class 'realRatingMatrix' with 99392 ratings.

After loading the data, we select only users who have rated at least 50 movies, and movies that have been watched at least 100 times.

movie_ratings <- MovieLense[rowCounts(MovieLense) > 50, colCounts(MovieLense) > 100]

Non Distributed Recommender System

In this section we will build the non-distributed recommender system using recommenderlab. Here, we split the MovieLense data, fit the ALS model, perform predictions on the model, and lastly evaluate processing time, and the accuracy of the model.

Build the RecommenderLab ALS based model

e <- evaluationScheme(movie_ratings, method = 'split', train = 0.9, given = 15)
rec_als_model <- Recommender(getData(e, 'train'), method = 'ALS')
rec_als_model
## Recommender of type 'ALS' for 'realRatingMatrix' 
## learned using 504 users.

Perform predictions on the model, and evaluate processing time and accuracy

start <- Sys.time()
predicted <- predict(rec_als_model, getData(e, 'known'), type = 'ratings')
stop <-Sys.time()
rec_als_model_accuracy <- calcPredictionAccuracy(predicted, getData(e, 'unknown'))
rec_als_model_accuracy
##      RMSE       MSE       MAE 
## 0.8935263 0.7983892 0.7092771
rec_time_taken <- stop - start
rec_time_taken
## Time difference of 12.13121 secs

Distributed Recommender System

In this section we will build the distributed recommender system using the sparklyr package to integrate the system with Spark.

Here, we firstly transform the data into a format that is agreeable with Spark, and then connect with Spark so that we can copy the data over to Spark as an object.

Next we split the data in Spark, and then fit the ALS model. Finally, we make our predictions and monitor the amount of time it takes Spark to complete processing the data.

Build the distributed ALS model and perform predictions

# Transform the data.
distributed_data <- movie_ratings %>% as(. , 'data.frame') %>% mutate(user = as.numeric(user), item = as.numeric(item)) 

# Open Spark connection.
sc <- spark_connect(master = "local", version = '3.0.0')

# Copy data to Spark.
spark_data <- sdf_copy_to(sc, distributed_data, 'sdf_rating_matrix', overwrite = TRUE)

# Split the Spark data into testing and training sets and build the model.
split_data <- spark_data %>% sdf_random_split(training = 0.9, testing = 0.2)
distributed_als_model <- ml_als(split_data$training, max_iter = 5)

# perform predictions.
start <- Sys.time()
predicted <- ml_transform(distributed_als_model, split_data$testing) %>% collect()
stop <-Sys.time()
dis_time_taken <- stop - start

# Close Spark connection.
spark_disconnect(sc)

Side by Side Performance Evaluation

To evaluate the performance of the models, we will compare the time it took for each model to complete processing, and compare the RMSE of the models.

Processing time comparision

processing_time <- rbind(rec_time_taken, dis_time_taken)

rownames(processing_time) <- c('RecommenderLab Model','Distributed Model')
colnames(processing_time) <- c('Processing Time')

knitr::kable(processing_time, format = 'html') %>%
  kableExtra::kable_styling(bootstrap_options = c('striped', 'hover')) %>% 
  add_header_above(c('Model Processing Time Comparision' = 2))
Model Processing Time Comparision
Processing Time
RecommenderLab Model 12.131206
Distributed Model 2.736417

RMSE comparision

# Get the RMSE from the RecommenderLab model. RecommenderLabs "calcPredictionAccuracy()" function takes
# care of calculating this for us, so we just need to fetch the value.
rec_rmse <- rec_als_model_accuracy[[1]]

# For the distributed model, we need to calcuate the RMSE ourselves. The Metrics package provides us
# with the rmse() function to take care of this task.
dis_rmse <- rmse(predicted$rating, predicted$prediction)


rmses <- rbind(rec_rmse, dis_rmse)

rownames(rmses) <- c('RecommenderLab Model','Distributed Model')
colnames(rmses) <- c('RMSE')

knitr::kable(rmses, format = 'html') %>%
  kableExtra::kable_styling(bootstrap_options = c('striped', 'hover')) %>% 
  add_header_above(c('Model RMSE Comparison' = 2))
Model RMSE Comparison
RMSE
RecommenderLab Model 0.8935263
Distributed Model 0.8915630

Conclusion

In terms of processing time, our distributed model holds a considerable edge over the non-distributed model. It took the distributed model over 10 seconds to process the data, were as the distributed model completed processing in just under 3 seconds. Our dataset is relatively small, but it proves that distributed systems hold a clear advantage over their non-distributed counterparts when it comes to processing large datasets.

The RMSE for the distributed model is marginally better than that of the non-distributed model. However, I believe that with larger datasets, this edge would scale up, suggesting that the distributed model may be a better choice when it comes to prediction errors.

Given the above findings, I believe moving to a distributed platform would become necessary when processing larger datasets. This is especially true when memory is limited. The advantage of using a distributed system in these circumstances is that processes can be split up and distributed over numerous data nodes. This prevents the system from crashing when memory runs out, which is exactly what would happen if we went with the non-distributed model to process a large dataset.

Stephen Haslett

7/06/2020