Assignment

The goal of this project is give you practice beginning to work with a distributed recommender system. It is sufficient for this assignment to build out your application on a single node.

Adapt one of your recommendation systems to work with Apache Spark and compare the performance with your previous iteration. Consider the efficiency of the system and the added complexity of using Spark. You may complete the assignment using PySpark (Python), SparkR (R) , sparklyr (R), or Scala.

#Libraries
library(tidyverse)
library(kableExtra)
library(knitr)
library(recommenderlab)
library(reshape2)
library(sparklyr)
library(tictoc)

Data Setup

MovieLense Data is chosen to perform recommendation. We explore the Matrix Factorization technique using ALS in recommenderlab and Spark to evaluate the performance.

GroupLens Research has collected and made available rating data sets from the MovieLens web site (http://movielens.org). The data sets were collected over various periods of time, depending on the size of the set

set.seed(612)

#MovieLens
data("MovieLense")
movielense <- MovieLense
movies <- as(movielense, "data.frame")

movies <- transform(movies, itemid = as.numeric(factor(item)))
colnames(movies) <- c("userId", "Movie", "rating", "movieId")

ratings <- movies %>% select(userId, movieId, rating) 
ratings$userId <- as.double(ratings$userId)

#Convert Data frame to user item matrix
user_item <- acast(ratings, userId~movieId, value.var="rating")

#Convert matrix into realRatingMatrix using recommenderLab package
user_item <- as(as.matrix(user_item), "realRatingMatrix")
dim(user_item)
## [1]  943 1664
#ratings_movies <- user_item[rowCounts(user_item)>100, colCounts(user_item)>100]
ratings_movies <- user_item

dim(ratings_movies)
## [1]  943 1664
rating_movies <- as(ratings_movies, "realRatingMatrix")

Building Model

recommenderlab

  • Building the Item-based Collaborative Filtering Model (IBCF) and finding error RMSE for IBCF
  • Alternating Least Square (ALS) Matrix Factorization in Collaborative Filtering is used
  • Capture the timings inorder to compare with spark ML method
  • Generate the Error values RMSE,MSE and MAE
  • eval_scheme <- evaluationScheme(data = rating_movies, method = "split", train = 0.8, given = -1, goodRating = 3,k=1)
    
    #IBCF
    eval_recommender_ibcf <- Recommender(data = getData(eval_scheme, "train"), method = "IBCF", parameter = NULL)
    eval_prediction_ibcf <- predict(object = eval_recommender_ibcf, newdata = getData(eval_scheme, "known"), n = 10, type = "ratings")
    calcPredictionAccuracy(x = eval_prediction_ibcf, data = getData(eval_scheme, "unknown"), byUser = FALSE)
    ##     RMSE      MSE      MAE 
    ## 1.365566 1.864772 1.045833
    tic()
    m_ALS <- Recommender(getData(eval_scheme, "train"), method = "ALS")
    training_time <- toc(quiet = TRUE)
    
    #Predicting
    tic()
    predALS <- predict(m_ALS, newdata = getData(eval_scheme, "known"), type = "ratings")
    predict_time <- toc(quiet = TRUE)
    
    Training <- round(training_time$toc - training_time$tic, 2)
    Prediction <- round(predict_time$toc - predict_time$tic, 2)
    
    assess <-  data.frame(Method = "recommenderlab", Training = round(training_time$toc - training_time$tic, 10), Prediction = round(predict_time$toc - predict_time$tic, 2))
    
    # Evaulate Accuracy
    eval_ALS <- calcPredictionAccuracy(predALS, getData(eval_scheme, "unknown"))
    
    kable(eval_ALS,caption = "recommenderlab - Error Report") %>%
      kable_styling(bootstrap_options = c("striped", "hover", "condensed", "responsive")) %>%
      row_spec(0, bold = T, color = "white", background = "#ea7872") 
    recommenderlab - Error Report
    x
    RMSE 0.9477607
    MSE 0.8982504
    MAE 0.7571143

    sparklyr

  • Building an Alternating Least Squares (ALS) using Spark ML
  • Capture the timings inorder to compare with recommenderlab method
  • Generate the Error values RMSE,MSE and MAE
  • #Spark Connection
    sc <- spark_connect(master = "local")
    #Assign the dataset to Spark
    spark_df <- ratings
    
    #Building Train and Test dataset
    train <- sample(x = c(TRUE, FALSE), size = nrow(spark_df),
                          replace = TRUE, prob = c(0.8, 0.2))
    train_df <- spark_df[train, ]
    test_df <- spark_df[!train, ]
    
    #Copy function to Spark
    spark_train <- sdf_copy_to(sc, train_df, "train_ratings", overwrite = TRUE)
    spark_test <- sdf_copy_to(sc, test_df, "test_ratings", overwrite = TRUE)
    
    tic()
    sparkALS <- ml_als(spark_train, max_iter = 5, nonnegative = TRUE, 
                       rating_col = "rating", user_col = "userId", item_col = "movieId")
    
    train_time <- toc(quiet = TRUE)
    
    tic()
    sparkPred <- sparkALS$.jobj %>%
      invoke("transform", spark_dataframe(spark_test)) %>% collect()
    
    predict_time <- toc(quiet = TRUE)
    
    #Get Time
    assess <- rbind(assess, data.frame(Method = "Spark", Training = round(train_time$toc - 
                                                                            train_time$tic, 5), Prediction = round(predict_time$toc - predict_time$tic, 2)))
    sparkPred <- sparkPred[!is.na(sparkPred$prediction), ] 
    
    #Calculate error
    mse_spark <- mean((sparkPred$rating - sparkPred$prediction)^2)
    rmse_spark <- sqrt(mse_spark)
    mae_spark <- mean(abs(sparkPred$rating - sparkPred$prediction))
    
    #Disconnect
    spark_disconnect(sc)
    #Building accuracy table
    Type <- c("RMSE", "MSE", "MAE")
    value <- c(rmse_spark, mse_spark, mae_spark)
    df <- data.frame(Type, value)
    
    kable(df,caption = "sparklyr - Error Report") %>%
      kable_styling(bootstrap_options = c("striped", "hover", "condensed", "responsive")) %>%
      row_spec(0, bold = T, color = "white", background = "#ea7872") 
    sparklyr - Error Report
    Type value
    RMSE 0.9267318
    MSE 0.8588317
    MAE 0.7405138

    Accuracy Comparison

  • Comparison results shows that there is marginal difference between spark and recommenderlab
  • acc <- rbind(eval_ALS, data.frame(RMSE = rmse_spark, MSE = mse_spark, MAE = mae_spark))
    rownames(acc) <- c("recommenderlab ALS", "Spark ALS")
    
    kable(acc,caption = "Error Accuracy Comparison") %>%
      kable_styling(bootstrap_options = c("striped", "hover", "condensed", "responsive")) %>%
      row_spec(0, bold = T, color = "white", background = "#ea7872") 
    Error Accuracy Comparison
    RMSE MSE MAE
    recommenderlab ALS 0.9477607 0.8982504 0.7571143
    Spark ALS 0.9267318 0.8588317 0.7405138

    Performance Evaluation

  • Spark outperformed in predicting results, but the training dataset was slower than recommender lab
  • Given the fact Spark ran local machine might result in slower training dataset build
  • Spark automatically partitions RDDs and distributes the partitions across different nodes to make it faster
  • kable(assess,caption = "Performance Comparison") %>%
      kable_styling(bootstrap_options = c("striped", "hover", "condensed", "responsive")) %>%
      row_spec(0, bold = T, color = "white", background = "#ea7872") 
    Performance Comparison
    Method Training Prediction
    elapsed recommenderlab 0.02 54.66
    elapsed1 Spark 8.97 2.61

    Summary

  • Spark is the clear winner, the advantage of distributed systems helps to outperform when predicting results
  • recommenderlab is intuitive and easy to use and did the job best, but may not be suitable for real world recommendation model with billions of data points
  • Spark gives the advantage of handling iterative and interactive algorithms with efficiency and minimal processing time as compared to traditional map-reduce paradigm and other packages avaialbe for recommendation, It automatically partitions RDDs and distributes the partitions across different nodes to make it faster