Objective

The goal of this project is give you practice beginning to work with a distributed recommender system.It is sufficient for this assignment to build out your application on a single node.

Adapt one of your recommendation systems to work with Apache Spark and compare the performance with your previous iteration. Consider the efficiency of the system and the added complexity of using Spark. You may complete the assignment using PySpark (Python), SparkR (R) , sparklyr (R), or Scala.

Introduction

This project covers setting up basic Spark in R environment. It relies on sparklyr package and runs all Spark code in the local mode. Running Spark locally is challenging, especially in setting the environment and connecting to Spark.

# Required libraries
library(recommenderlab)  # Matrix/recommender functions
## Warning: package 'recommenderlab' was built under R version 3.6.3
## Loading required package: Matrix
## Loading required package: arules
## Warning: package 'arules' was built under R version 3.6.3
## 
## Attaching package: 'arules'
## The following objects are masked from 'package:base':
## 
##     abbreviate, write
## Loading required package: proxy
## Warning: package 'proxy' was built under R version 3.6.3
## 
## Attaching package: 'proxy'
## The following object is masked from 'package:Matrix':
## 
##     as.matrix
## The following objects are masked from 'package:stats':
## 
##     as.dist, dist
## The following object is masked from 'package:base':
## 
##     as.matrix
## Loading required package: registry
## Registered S3 methods overwritten by 'registry':
##   method               from 
##   print.registry_field proxy
##   print.registry_entry proxy
library(dplyr)           # Data manipulation
## 
## Attaching package: 'dplyr'
## The following objects are masked from 'package:arules':
## 
##     intersect, recode, setdiff, setequal, union
## The following objects are masked from 'package:stats':
## 
##     filter, lag
## The following objects are masked from 'package:base':
## 
##     intersect, setdiff, setequal, union
library(tidyr)           # Data manipulation
## Warning: package 'tidyr' was built under R version 3.6.3
## 
## Attaching package: 'tidyr'
## The following objects are masked from 'package:Matrix':
## 
##     expand, pack, unpack
library(ggplot2)         # Plotting
library(tictoc)          # Operation timing
library(sparklyr)        # Spark processing
## Warning: package 'sparklyr' was built under R version 3.6.3
# Set up data frame for timing
timing <- data.frame(Method=character(), Training=double(), Predicting=double())

Data Set

FOr this Project We used product ratings dataset available from kaggle. It was downloaded from Kaggle.com (https://www.kaggle.com/skillsmuggler/amazon-ratings). Dataset set contained 2,023,070 observations and 4 variables - User ID, Product ID, Rating and Time Stamp. It was reduced to make it more manageable to 3,562 users, 9,647 products, and 68,565 ratings. This is still a large enough set that various processes may take from a few seconds to a few minutes making it possible to roughly compare performance.

# Import data from Github
ratings <- read.csv("https://raw.githubusercontent.com/vijay564/DATA612/master/Project_5/ratings_Short.csv")

ALS Model Using recommenderlab Package

After thorough analysis we founf out that Alternating Least Squares (ALS) appears to be one of the simplier recommender processes to be set up using Spark.

# Data prep
ratingsMatrix <- sparseMatrix(as.integer(ratings$UserId), as.integer(ratings$ProductId), x = ratings$Rating)
colnames(ratingsMatrix) <- levels(ratings$ProductId)
rownames(ratingsMatrix) <- levels(ratings$UserId)
amazon <- as(ratingsMatrix, "realRatingMatrix")
# Split data into Test and Train
set.seed(88)
eval <- evaluationScheme(amazon, method = "split", train = 0.8, given = 5, goodRating = 3)
train <- getData(eval, "train")
known <- getData(eval, "known")
unknown <- getData(eval, "unknown")
# Training
tic()
modelALS <- Recommender(train, method = "ALS")
train_time <- toc(quiet = TRUE)
# Predicting
tic()
predALS <- predict(modelALS, newdata = known, type = "ratings")
predict_time <- toc(quiet = TRUE)
timing <- rbind(timing, data.frame(Method = "recommenderlab", 
                                   Training = round(train_time$toc - train_time$tic, 2), 
                                   Predicting = round(predict_time$toc - predict_time$tic, 2)))
# Accuracy
accALS <- calcPredictionAccuracy(predALS, unknown)

ALS Model Using Spark

Above model can be done in Spark. The general process is very simple - set up Spark local instance, copy relevant data frames into Spark, perform modeling and run predictions, compare results. Results are evaluated mostly using RMSE.

# Connection
java_path <- normalizePath("C:\\Program Files\\Java\\jre1.8.0_251")
Sys.setenv(JAVA_HOME = java_path)
Sys.getenv('JAVA_HOME')
## [1] "C:\\Program Files\\Java\\jre1.8.0_251"
sc <- spark_connect(master = "local")
# Prepare data
spark_df <- ratings
spark_df$UserId <- as.integer(spark_df$UserId)
spark_df$ProductId <- as.integer(spark_df$ProductId)
# Split for training and testing
which_train <- sample(x = c(TRUE, FALSE), size = nrow(spark_df),
                      replace = TRUE, prob = c(0.8, 0.2))
train_df <- spark_df[which_train, ]
test_df <- spark_df[!which_train, ]
# Move to Spark
spark_train <- sdf_copy_to(sc, train_df, "train_ratings", overwrite = TRUE)
spark_test <- sdf_copy_to(sc, test_df, "test_ratings", overwrite = TRUE)
# Build model
tic()
sparkALS <- ml_als(spark_train, max_iter = 5, nonnegative = TRUE, 
                   rating_col = "Rating", user_col = "UserId", item_col = "ProductId")
train_time <- toc(quiet = TRUE)
# Run prediction
tic()
sparkPred <- sparkALS$.jobj %>%
  invoke("transform", spark_dataframe(spark_test)) %>%
  collect()
predict_time <- toc(quiet = TRUE)
timing <- rbind(timing, data.frame(Method = "Spark", 
                                   Training = round(train_time$toc - train_time$tic, 2), 
                                   Predicting = round(predict_time$toc - predict_time$tic, 2)))
sparkPred <- sparkPred[!is.na(sparkPred$prediction), ] # Remove NaN due to data set splitting
# Calculate error
mseSpark <- mean((sparkPred$Rating - sparkPred$prediction)^2)
rmseSpark <- sqrt(mseSpark)
maeSpark <- mean(abs(sparkPred$Rating - sparkPred$prediction))
# Disconnect
spark_disconnect(sc)

Analysis

Even though the data was split differently between recommenderlab and Spark models, if we look at RMSE values they are very similar and needs to be expanded if same ALS method is used on same data. The minor difference is due to differenct observations used while implementation.

accuracy <- rbind(accALS, data.frame(RMSE = rmseSpark, MSE = mseSpark, MAE = maeSpark))
rownames(accuracy) <- c("recommenderlab ALS", "Spark ALS")
knitr::kable(accuracy, format = "html") %>%
  kableExtra::kable_styling(bootstrap_options = c("striped", "hover"))
RMSE MSE MAE
recommenderlab ALS 1.338125 1.790578 1.048341
Spark ALS 1.336929 1.787379 1.030275

Conclusion

The big difference is in performance. With recommerlab package it took over 4 minutes to run the prediction on the testing set. Training is noticeably slower with Spark, but still just a few seconds. This evaluation was done on the local instance of Spark which has some limitations. With distributed environment performance can be improved.

knitr::kable(timing, format = "html", row.names = FALSE) %>%
  kableExtra::kable_styling(bootstrap_options = c("striped", "hover"))
Method Training Predicting
recommenderlab 0.0 476.44
Spark 7.5 2.36

From above we can consider that running with local instance of Spark, it icreased the overall performance. If we have a multi node cluster environment then there will be a huge gain in performance.