The goal of this project is to practice working with a distributed recommender system. This project adapts one of the recommendation systems previously used with spark. This markdown will discuss algorithims and implementation as well as the necessity of moving to a Spark platform.
library(tidyverse)
library(readr)
library(sqldf)
library(dplyr)
library(tidyr)
library(tidymodels)
library(tinytex)
library(recommenderlab)
library(kableExtra)
library(gridExtra)
library(sparklyr)
## Warning: package 'sparklyr' was built under R version 4.0.2
library(tictoc)
library(rJava)
data_package <- data(package = "recommenderlab")
data("MovieLense")
ratings_movies <- MovieLense[rowCounts(MovieLense) > 100,
colCounts(MovieLense) > 100]
train_percent<-0.8
kept_items<-10
rating_threshold<-3
n_eval<-8
no_recommendations<-10
set.seed(2)
eval_sets <- evaluationScheme(data = ratings_movies, method = "cross-validation",
train = train_percent, given = kept_items, goodRating = rating_threshold, k = n_eval)
#used to train
recc_train<- getData(eval_sets,'train')
#used to predict
recc_known<-getData(eval_sets,'known')
#used to test
recc_unknown<-getData(eval_sets,'unknown')
The dataset that is being used for this project is the MovieLens dataset. this can be found on https://grouplens.org/datasets/movielens/. GroupLens collects various datasets and maintains them for individuals to use for free. The specific version of the dataset is “ml-latest-small.zip”. All files relevant to this project can be found in the student’s github repository. This dataset contains over 100,000 observations of different movie ratings from different users. A sample of this dataset is shown in the following table. The tictoc r package is being used to log the time it takes to produce a recommendation for the ALS recommender system.
tic()
ALS_eval<-Recommender(recc_train,method = "ALS")
Rlab_traintime<-toc()
## 0.02 sec elapsed
tic()
recc_predicted<-predict(object = ALS_eval,newdata=recc_known,n=no_recommendations, type= "ratings")
rlab_predicttime<-toc()
## 5.92 sec elapsed
predacc<-calcPredictionAccuracy(recc_predicted,recc_unknown, byUser = F)
Recommender_ALS<-
data.frame(Method = "recommenderlab",
train_time = (Rlab_traintime$toc- Rlab_traintime$tic),
predict_time = (rlab_predicttime$toc - rlab_predicttime$tic),
RMSE = predacc[1],
MSE = predacc[2],
MAE = predacc[3]
)
Recommender_ALS
## Method train_time predict_time RMSE MSE MAE
## elapsed recommenderlab 0.02 5.92 0.9849568 0.9701399 0.780771
Next, we do the same computations but by connecting to a spark distributed system prior to doing the evaluation. After establishing a spark connection, we are able to compute our same als recommender models and get the times as shown in the table below. we notice that we have very comparable RMSE’s whether computed locally or through spark and our compute time to train was much larger on spark than locally, while our predict time on spark was much faster than locally.
This is beneficial and efficient especially when dealing with large datasets as it is more important that our prediction times are quick as opposed to the training times. this allows users to get instant feedback on what products or systems should be recommended to them. These systems can be complex and can be prone to a lot of debugging. personally, working with sparklyr locally led to several java related issues on my machine but this may be a product of unfamiliarity.
sc<- spark_connect(master = 'local')
movie_id <- data.frame(item = MovieLenseMeta$title, title_ID = as.integer(row.names(MovieLenseMeta)))
spark_train<- getData.frame(recc_train)
spark_train$user<- as.numeric(spark_train$user)
spark_train<-left_join(spark_train,movie_id, by = "item")
sparktrain<-copy_to(sc,spark_train,overwrite = TRUE)
spark_test<- getData.frame(recc_known)
spark_test$user<- as.numeric(spark_test$user)
spark_test<-left_join(spark_test,movie_id, by = "item")
sparktest<-copy_to(sc,spark_test,overwrite = TRUE)
tic()
Spark_ALS <- ml_als(sparktrain, rating_col = "rating", user_col = "user", item_col = "title_ID", max_iter = 5)
Spark_traintime<-toc()
## 4.81 sec elapsed
tic()
spark_pred <- Spark_ALS$.jobj %>%
invoke("transform", spark_dataframe(sparktest)) %>%
collect()
spark_predicttime<-toc()
## 2 sec elapsed
spark_pred <- spark_pred[!is.na(spark_pred$prediction), ]
mseSpark <- MSE(spark_pred$rating,spark_pred$prediction)
rmseSpark <- RMSE(spark_pred$rating,spark_pred$prediction)
maeSpark <- MAE(spark_pred$rating,spark_pred$prediction)
spark_ALS<- data.frame(Method = "Spark",
train_time = (Spark_traintime$toc- Spark_traintime$tic),
predict_time = (spark_predicttime$toc - spark_predicttime$tic),
RMSE = rmseSpark,
MSE = mseSpark,
MAE = maeSpark
)
bind_rows(Recommender_ALS,spark_ALS)[1:3] %>% kable %>%
kable_styling(full_width = FALSE,
position = "center",
bootstrap_options = c("hover", "condense","responsive"))
| Method | train_time | predict_time |
|---|---|---|
| recommenderlab | 0.02 | 5.92 |
| Spark | 4.81 | 2.00 |
spark_disconnect(sc)