Introduction

Adapt one of your recommendation systems to work with Apache Spark and compare the performance with your previous iteration. Consider the efficiency of the system and the added complexity of using Spark. You may complete the assignment using PySpark (Python), SparkR (R) , sparklyr (R), or Scala.

#Required libraries

library(recommenderlab)  
library(tictoc)          
library(sparklyr)        
library(dplyr)           
library(tidyr)           
library(ggplot2)        


# Set up data frame for timing
timing <- data.frame(Method=character(), Training=double(), Predicting=double())

Data Set

The data set is a product ratings set for beauty products sold on Amazon.com. It was downloaded from Kaggle.com (https://www.kaggle.com/skillsmuggler/amazon-ratings).

ratings <- read.csv(paste0("https://raw.githubusercontent.com/ErindaB/Data-612/master/Ratings.csv"))

ALS Model Using recommenderlab Package

# Data prep
RMatrix <- sparseMatrix(as.integer(ratings$UserId), as.integer(ratings$ProductId), x = ratings$Rating)
colnames(RMatrix) <- levels(ratings$ProductId)
rownames(RMatrix) <- levels(ratings$UserId)
amazon_ratings <- as(RMatrix, "realRatingMatrix")
# Train/test split
set.seed(88)
evaluation <- evaluationScheme(amazon_ratings, method = "split", train = 0.8, given = 5, goodRating = 3)
train <- getData(evaluation, "train")
known <- getData(evaluation, "known")
unknown <- getData(evaluation, "unknown")
# Training
tic()
modelALS <- Recommender(train, method = "ALS")
train_time <- toc(quiet = TRUE)
# Predicting
tic()
predALS <- predict(modelALS, newdata = known, type = "ratings")
predict_time <- toc(quiet = TRUE)

timing <- rbind(timing, data.frame(Method = "recommenderlab", 
                                   Training = round(train_time$toc - train_time$tic, 2), 
                                   Predicting = round(predict_time$toc - predict_time$tic, 2)))

# Accuracy
accALS <- calcPredictionAccuracy(predALS, unknown)

ALS Model Using Spark

# Connection
sc <- spark_connect(master = "local")

# Prepare data
spark_df <- ratings
spark_df$UserId <- as.integer(spark_df$UserId)
spark_df$ProductId <- as.integer(spark_df$ProductId)

# Split for training and testing
which_train <- sample(x = c(TRUE, FALSE), size = nrow(spark_df),
                      replace = TRUE, prob = c(0.8, 0.2))
train_df <- spark_df[which_train, ]
test_df <- spark_df[!which_train, ]
# Move to Spark
spark_train <- sdf_copy_to(sc, train_df, "train_ratings", overwrite = TRUE)
spark_test <- sdf_copy_to(sc, test_df, "test_ratings", overwrite = TRUE)

# Build model
tic()
sparkALS <- ml_als(spark_train, max_iter = 5, nonnegative = TRUE, 
                   rating_col = "Rating", user_col = "UserId", item_col = "ProductId")
train_time <- toc(quiet = TRUE)

# Run prediction
tic()
sparkPred <- sparkALS$.jobj %>%
  invoke("transform", spark_dataframe(spark_test)) %>%
  collect()
predict_time <- toc(quiet = TRUE)

timing <- rbind(timing, data.frame(Method = "Spark", 
                                   Training = round(train_time$toc - train_time$tic, 2), 
                                   Predicting = round(predict_time$toc - predict_time$tic, 2)))

sparkPred <- sparkPred[!is.na(sparkPred$prediction), ] # Remove NaN due to data set splitting

# Calculate error
mseSpark <- mean((sparkPred$Rating - sparkPred$prediction)^2)
rmseSpark <- sqrt(mseSpark)
maeSpark <- mean(abs(sparkPred$Rating - sparkPred$prediction))

# Disconnect
spark_disconnect(sc)

Analysis

By looking at corresonding RMSE values we can get the general idea about the accuracy of two models. The values are very similar which is to be expected if the same method (ALS) is used on the same data. The minor difference is just due to different observartions used for training and different implementation.

accuracy <- rbind(accALS, data.frame(RMSE = rmseSpark, MSE = mseSpark, MAE = maeSpark))
rownames(accuracy) <- c("recommenderlab ALS", "Spark ALS")
knitr::kable(accuracy, format = "html") %>%
kableExtra::kable_styling(bootstrap_options = c("striped", "hover"))
RMSE MSE MAE
recommenderlab ALS 1.338125 1.790578 1.048341
Spark ALS 1.341641 1.799999 1.032427

It is obvious there is a big difference in performance. With recommerlab package it took over 4 minutes to run the prediction on the testing set. Training is noticeably slower with Spark, but still just a few seconds. Since this evaluation was done on the local instance of Spark, it was still subject to local limitations.

knitr::kable(timing, format = "html", row.names = FALSE) %>%
  kableExtra::kable_styling(bootstrap_options = c("striped", "hover"))
Method Training Predicting
recommenderlab 0.00 387.78
Spark 19.83 3.61

Even with running just the local instance, Spark improved overall performance. This is clearly the biggest advantage of the distributed processing. The biggest disadvantage is also fairly obvious - more complex implementation. Despite the conclusions drawn from the times elapsed, Spark is probably better when handling large datasets of a million or more ratings/entries. This is said mostly due to personal opinion drawn from waiting for the ALS method to finish in Rmarkdown.

Additionally, it is important to consider how often a model should be updated. If a recommender system needs to respond to changes quickly, performance again becomes key and distributed processing is worth the effort.