The goal of this project is give you practice beginning to work with a distributed recommender system. It is sufficient for this assignment to build out your application on a single node.
Adapt one of your recommendation systems to work with Apache Spark and compare the performance with your previous iteration. Consider the efficiency of the system and the added complexity of using Spark. You may complete the assignment using PySpark (Python), SparkR (R) , sparklyr (R), or Scala.
#Libraries
library(tidyverse)
library(kableExtra)
library(knitr)
library(recommenderlab)
library(reshape2)
library(sparklyr)
library(tictoc)MovieLense Data is chosen to perform recommendation. We explore the Matrix Factorization technique using ALS in recommenderlab and Spark to evaluate the performance.
GroupLens Research has collected and made available rating data sets from the MovieLens web site (http://movielens.org). The data sets were collected over various periods of time, depending on the size of the set
set.seed(612)
#MovieLens
data("MovieLense")
movielense <- MovieLense
movies <- as(movielense, "data.frame")
movies <- transform(movies, itemid = as.numeric(factor(item)))
colnames(movies) <- c("userId", "Movie", "rating", "movieId")
ratings <- movies %>% select(userId, movieId, rating)
ratings$userId <- as.double(ratings$userId)
#Convert Data frame to user item matrix
user_item <- acast(ratings, userId~movieId, value.var="rating")
#Convert matrix into realRatingMatrix using recommenderLab package
user_item <- as(as.matrix(user_item), "realRatingMatrix")
dim(user_item)## [1] 943 1664
#ratings_movies <- user_item[rowCounts(user_item)>100, colCounts(user_item)>100]
ratings_movies <- user_item
dim(ratings_movies)## [1] 943 1664
eval_scheme <- evaluationScheme(data = rating_movies, method = "split", train = 0.8, given = -1, goodRating = 3,k=1)
#IBCF
eval_recommender_ibcf <- Recommender(data = getData(eval_scheme, "train"), method = "IBCF", parameter = NULL)
eval_prediction_ibcf <- predict(object = eval_recommender_ibcf, newdata = getData(eval_scheme, "known"), n = 10, type = "ratings")
calcPredictionAccuracy(x = eval_prediction_ibcf, data = getData(eval_scheme, "unknown"), byUser = FALSE)## RMSE MSE MAE
## 1.365566 1.864772 1.045833
tic()
m_ALS <- Recommender(getData(eval_scheme, "train"), method = "ALS")
training_time <- toc(quiet = TRUE)
#Predicting
tic()
predALS <- predict(m_ALS, newdata = getData(eval_scheme, "known"), type = "ratings")
predict_time <- toc(quiet = TRUE)
Training <- round(training_time$toc - training_time$tic, 2)
Prediction <- round(predict_time$toc - predict_time$tic, 2)
assess <- data.frame(Method = "recommenderlab", Training = round(training_time$toc - training_time$tic, 10), Prediction = round(predict_time$toc - predict_time$tic, 2))
# Evaulate Accuracy
eval_ALS <- calcPredictionAccuracy(predALS, getData(eval_scheme, "unknown"))
kable(eval_ALS,caption = "recommenderlab - Error Report") %>%
kable_styling(bootstrap_options = c("striped", "hover", "condensed", "responsive")) %>%
row_spec(0, bold = T, color = "white", background = "#ea7872") | x | |
|---|---|
| RMSE | 0.9477607 |
| MSE | 0.8982504 |
| MAE | 0.7571143 |
#Spark Connection
sc <- spark_connect(master = "local")
#Assign the dataset to Spark
spark_df <- ratings
#Building Train and Test dataset
train <- sample(x = c(TRUE, FALSE), size = nrow(spark_df),
replace = TRUE, prob = c(0.8, 0.2))
train_df <- spark_df[train, ]
test_df <- spark_df[!train, ]
#Copy function to Spark
spark_train <- sdf_copy_to(sc, train_df, "train_ratings", overwrite = TRUE)
spark_test <- sdf_copy_to(sc, test_df, "test_ratings", overwrite = TRUE)
tic()
sparkALS <- ml_als(spark_train, max_iter = 5, nonnegative = TRUE,
rating_col = "rating", user_col = "userId", item_col = "movieId")
train_time <- toc(quiet = TRUE)
tic()
sparkPred <- sparkALS$.jobj %>%
invoke("transform", spark_dataframe(spark_test)) %>% collect()
predict_time <- toc(quiet = TRUE)
#Get Time
assess <- rbind(assess, data.frame(Method = "Spark", Training = round(train_time$toc -
train_time$tic, 5), Prediction = round(predict_time$toc - predict_time$tic, 2)))
sparkPred <- sparkPred[!is.na(sparkPred$prediction), ]
#Calculate error
mse_spark <- mean((sparkPred$rating - sparkPred$prediction)^2)
rmse_spark <- sqrt(mse_spark)
mae_spark <- mean(abs(sparkPred$rating - sparkPred$prediction))
#Disconnect
spark_disconnect(sc)#Building accuracy table
Type <- c("RMSE", "MSE", "MAE")
value <- c(rmse_spark, mse_spark, mae_spark)
df <- data.frame(Type, value)
kable(df,caption = "sparklyr - Error Report") %>%
kable_styling(bootstrap_options = c("striped", "hover", "condensed", "responsive")) %>%
row_spec(0, bold = T, color = "white", background = "#ea7872") | Type | value |
|---|---|
| RMSE | 0.9267318 |
| MSE | 0.8588317 |
| MAE | 0.7405138 |
acc <- rbind(eval_ALS, data.frame(RMSE = rmse_spark, MSE = mse_spark, MAE = mae_spark))
rownames(acc) <- c("recommenderlab ALS", "Spark ALS")
kable(acc,caption = "Error Accuracy Comparison") %>%
kable_styling(bootstrap_options = c("striped", "hover", "condensed", "responsive")) %>%
row_spec(0, bold = T, color = "white", background = "#ea7872") | RMSE | MSE | MAE | |
|---|---|---|---|
| recommenderlab ALS | 0.9477607 | 0.8982504 | 0.7571143 |
| Spark ALS | 0.9267318 | 0.8588317 | 0.7405138 |
kable(assess,caption = "Performance Comparison") %>%
kable_styling(bootstrap_options = c("striped", "hover", "condensed", "responsive")) %>%
row_spec(0, bold = T, color = "white", background = "#ea7872") | Method | Training | Prediction | |
|---|---|---|---|
| elapsed | recommenderlab | 0.02 | 54.66 |
| elapsed1 | Spark | 8.97 | 2.61 |