The goal of this project is give you practice beginning to work with a distributed recommender system. It is sufficient for this assignment to build out your application on a single node.
ratings <- read.csv('https://raw.githubusercontent.com/msdsrep4/DATA-RS/master/ratings.csv')
kable(head(ratings)) %>% kable_styling("striped", full_width = F)
| userId | movieId | rating | timestamp |
|---|---|---|---|
| 1 | 1 | 4 | 964982703 |
| 1 | 3 | 4 | 964981247 |
| 1 | 6 | 4 | 964982224 |
| 1 | 47 | 5 | 964983815 |
| 1 | 50 | 5 | 964982931 |
| 1 | 70 | 3 | 964982400 |
tic()
rating_matrix <- spread(ratings[,1:3], movieId, rating)
rating_matrix <- as.matrix(rating_matrix[,-1])
rating_matrix <- as(rating_matrix, "realRatingMatrix")
ratings_relevant <- rating_matrix[rowCounts(rating_matrix) > 100, colCounts(rating_matrix) > 100]
ratings_relevant
## 245 x 134 rating matrix of class 'realRatingMatrix' with 13659 ratings.
eval_sch <- evaluationScheme(rating_matrix, method="cross-validation", k=10, given=3, goodRating=3)
# set train and test sets
train <- getData(eval_sch,"train")
test <- getData(eval_sch,"known")
# Prepare evaluation set**
evaluation <- getData(eval_sch, "unknown")
recommender_model <- Recommender(data = train, method = "ALS")
recom <- predict(recommender_model, newdata = test, n=10, type="ratings")
# Evaluate Accuracy:
eval_accuracy <- calcPredictionAccuracy(x = recom, data = evaluation, given=10, goodRating=3)
kable(eval_accuracy) %>% kable_styling("striped", full_width = F)
| x | |
|---|---|
| RMSE | 1.0958351 |
| MSE | 1.2008545 |
| MAE | 0.8535208 |
elapsed_time1 <- toc(quiet = FALSE)
## 196.596 sec elapsed
tic()
# connect to Spark locally
sc <- spark_connect(master = "local")
# Spark data processing
spark_ratings <- ratings[,-4]
train_filter <- sample(x = c(TRUE, FALSE), size = nrow(spark_ratings),replace = TRUE, prob = c(0.8, 0.2))
train_ratings <- spark_ratings[train_filter, ]
test_ratings <- spark_ratings[!train_filter, ]
spark_train <- sdf_copy_to(sc, train_ratings, "train_ratings", overwrite = TRUE)
spark_test <- sdf_copy_to(sc, test_ratings, "test_ratings", overwrite = TRUE)
spark_recommender_model <- ml_als(spark_train, max_iter = 5, nonnegative = TRUE,
rating_col = "rating", user_col = "userId", item_col = "movieId")
spark_recom <- spark_recommender_model$.jobj %>%
invoke("transform", spark_dataframe(spark_test)) %>%
collect()
spark_recom <- spark_recom[!is.na(spark_recom$prediction), ]
# Evaluating Model
spark_mse <- mean((spark_recom$rating - spark_recom$prediction)^2)
spark_rmse <- sqrt(spark_mse)
spark_mae <- mean(abs(spark_recom$rating - spark_recom$prediction))
# Spark disconnection
spark_disconnect(sc)
elapsed_time2 <- toc(quiet = FALSE)
## 61.139 sec elapsed
# Comparison
comparison <- rbind(eval_accuracy, data.frame(RMSE = spark_rmse, MSE = spark_mse , MAE = spark_mae))
rownames(comparison) <- c("Centralized System", "Distributed System")
kable(comparison) %>% kable_styling("striped", full_width = F)
| RMSE | MSE | MAE | |
|---|---|---|---|
| Centralized System | 1.0958351 | 1.2008545 | 0.8535208 |
| Distributed System | 0.8776837 | 0.7703286 | 0.6772179 |
The installation of Spark on my local machine (MacBook) was challenging and complicated as I've somewhat expected. After the installation, the use of the sparklyr is not smooth sailing and it's not easy to translate a regular model function to a Spark model function. After trying several variations, finally I was able to make the Spark work. I needed to switch the system from version 2.4.6 to 3.0.0.
Just like in any distributed systems, Spark implementation of the ALS based recommender model performs better than recommenderlab when it comes to speed and accuracy. As we can see from the elapsed time, Spark performed three times faster than recommenderlab.
I think Spark is useful if you need to access the system online and when you have a huge data to process.