Objective

Adapt one of your recommendation systems to work with Apache Spark and compare the performance with your previous iteration.

Things that were taken into consideration were the efficiency of the systems as well as Spark’s complexity. This assignment was completed using sparklyr in R.

Data Utilized and Loading

our Dataset is the MovieLense from https://grouplens.org/datasets/movielens/

We will be using the ratings csv file with more than 100 rating rows. It has userid, movieid, rating and timestamp.

Since determining the efficiency of Spark is one of the main goals of this assignment, ensuring the datasets were large enough to display any differences while keeping in mind R’s handling capacity was one of the points taken into consideration.

We will be using the RecommenderLab and Spark packages to create an ALS model which will be used for prediction.

# reading data
ratings = read.csv("https://raw.githubusercontent.com/theoracley/Data612/master/Project5/ratings.csv")

# transforming to a wide format
moviesData<-ratings%>% select (movieId, userId, rating) %>% spread (movieId,rating)

# converting the data set into a real rating matrix
movieMatrix <- as(as.matrix(moviesData[-c(1)]), "realRatingMatrix")

Using RecommenderLab

# splitting data on train and test sets
eval_Scheme<- evaluationScheme(movieMatrix, method = "split", train = 0.9, given = 5, goodRating = 3)
training <-getData(eval_Scheme, "train")
testing <-getData(eval_Scheme, "unknown")
testing_known <- getData(eval_Scheme, "known")

#  training ALS model
tic()
king_model <- Recommender(training, method = "ALS")
trainingTime <- toc(quiet = TRUE)


# making predictions - top 10
tic()
king_prediction<- predict (king_model, testing, n = 10, type = "topNList")
predictionTime <- toc(quiet = TRUE)

Using Spark

usually when working with spark, we follow this methodology: Connect-work-disconnect

  1. Connect using spark_connect()

  2. Do some work

  3. Disconnect using spark_disconnect()

I’m installing Spark in my local system using ethe following:

install the sparklyr package: install.packages("sparklyr")

call spark_install()

# -----spark installation in thsi order-----
# install.packages("sparklyr")
# library(sparklyr)
# spark_install()


# Connection to Spark
spark_conn <- spark_connect(master = "local")

# Split for training and testing (75%/25%)
spark_data <- ratings
size <- floor(0.75 * nrow(spark_data))

set.seed(98765)
training_records <- sample(seq_len(nrow(spark_data)), size = size)

training <- spark_data[training_records, ]
testing <- spark_data[-training_records, ]

#  moving data frames to Spark
spark_training <- sdf_copy_to(spark_conn, training, "train_ratings", overwrite = TRUE)
spark_testing <- sdf_copy_to(spark_conn, testing, "test_ratings", overwrite = TRUE)

Model training and Prediction

#  building our ALS model
tic()
model <- ml_als(spark_training, max_iter = 5, nonnegative = TRUE, rating_col = "rating", user_col = "userId", item_col = "movieId")
trainingTime_spark <- toc(quiet = TRUE)

# predicting the ratings
tic()
sparkPred<-ml_predict(model, spark_training)
head(sparkPred)
## # Source: spark<?> [?? x 5]
##   userId movieId rating timestamp prediction
##    <int>   <int>  <dbl>     <int>      <dbl>
## 1    350      12      3 864941118       3.06
## 2     44      12      1 869252043       1.68
## 3    120      12      3 860070182       2.83
## 4    294      12      1 966597190       1.97
## 5    492      12      3 863976249       3.13
## 6    524      12      1 852404800       1.94
predictionTime_spark <- toc(quiet = TRUE)


# here are our top 10 movies we recommend for each user
ml_recommend(model, type = c("item"), n = 10)
## # Source: spark<?> [?? x 4]
##    userId recommendations  movieId rating
##     <int> <list>             <int>  <dbl>
##  1     12 <named list [2]>  183897   6.13
##  2     12 <named list [2]>   86237   6.06
##  3     12 <named list [2]>   74226   6.06
##  4     12 <named list [2]>  134796   6.06
##  5     12 <named list [2]>  138966   6.06
##  6     12 <named list [2]>  171495   6.06
##  7     12 <named list [2]>    7071   6.06
##  8     12 <named list [2]>  117531   6.06
##  9     12 <named list [2]>  184245   6.06
## 10     12 <named list [2]>   84273   6.06
## # ... with more rows
# Disconnect
spark_disconnect(spark_conn)

Models Comparison

model1<-cbind(train=trainingTime$toc - trainingTime$tic, predict = predictionTime$toc - predictionTime$tic)
model2<-cbind(train=trainingTime_spark$toc - trainingTime_spark$tic, predict = predictionTime_spark$toc - predictionTime_spark$tic)

summary = rbind(model1, model2)
rownames(summary) <- c("RecommenderLab","Spark")
summary
##                train predict
## RecommenderLab  0.01  193.69
## Spark          11.34    7.00

Conclusion

Moving to spark is greatly advantageous when when processing large dataset. In our case, Spark did not perform well in our training phase, but did very well in the recommendation phase. Because of the distribution nature of Spark, it’s implementation becomes harder and requires parallel processing skills. For that, we must always evaluate complexity again data size. There is no necessity of using spark with its complexity if your data is not that large. Keeping it simple always wins.