The goal of this project is give you practice beginning to work with a distributed recommender system.It is sufficient for this assignment to build out your application on a single node.
Please include in your conclusion: For your given recommender system’s data, algorithm(s), and (envisioned) implementation, at what point would you see moving to a distributed platform such as Spark becoming necessary?
We took this dataset ml-latest-small.zip from Movie Lens site which describes 5-star rating and free-text tagging activity from MovieLens, a movie recommendation service. It contains 100836 ratings and 3683 tag applications across 9742 movies. These data were created by 610 users between March 29, 1996 and September 24, 2018. This dataset was generated on September 26, 2018.
Citation :- F. Maxwell Harper and Joseph A. Konstan. 2015. The MovieLens Datasets: History and Context. ACM Transactions on Interactive Intelligent Systems (TiiS) 5, 4: 19:1-19:19. https://doi.org/10.1145/2827872
recommenderlab
dplyr
reshape2
kableExtra
tictoc
sparklyr
Data is loaded from the github, and then selecting the columns to create a matrix which is a class of realRatingMatrix. As our matrix doesn’t have any NA that means every user has seen every movie and provided ratings but all of them may not be relevant.
ratings <- read.csv("https://raw.githubusercontent.com/samriti0202/DATA612-RecommenderSystems/master/Project2/ratings.csv")
titles <- read.csv("https://raw.githubusercontent.com/samriti0202/DATA612-RecommenderSystems/master/Project2/movies.csv")
ratings <- ratings %>% select(userId, movieId, rating)
#converting the ratings data frame into userId-movieId matrix
ratingDT <- acast(ratings, userId~movieId, value.var="rating")
#convert matrix into realRatingMatrix using recommenderLab package
ratingDT <- as(as.matrix(ratingDT), "realRatingMatrix")
dim(ratingDT)
## [1] 610 9724
As rule of thumb for beginning user who rating more than 100 movies and movies which have been watched more than 100 time. those are the ones we going to take initially.
ratings_movies <- ratingDT[rowCounts(ratingDT)>100, colCounts(ratingDT)>100]
dim(ratings_movies)
## [1] 245 134
Now the dataset has reduced but still it is a large dataset may be we might have to take a smaller dataset for ALS evluation.Lets first do the evaulation using ALS algorithms and compare recommendation systems to work with Apache Spark.
Taking a subset of the relevant dataset ,as the memory imprint was too high and iyt was taking time to build the recommender model.
rating_movies <- as(ratings_movies, "realRatingMatrix")
rm()
set.seed(88)
eval_sets <- evaluationScheme(data = rating_movies, method = "split", train = 0.8, given = -1, goodRating = 3,k=1)
#IBCF
eval_recommender_ibcf <- Recommender(data = getData(eval_sets, "train"), method = "IBCF", parameter = NULL)
eval_prediction_ibcf <- predict(object = eval_recommender_ibcf, newdata = getData(eval_sets, "known"), n = 10, type = "ratings")
calcPredictionAccuracy(x = eval_prediction_ibcf, data = getData(eval_sets, "unknown"), byUser = FALSE)
## RMSE MSE MAE
## 0.8860376 0.7850626 0.6901797
We will be using Alternating Least Square model as benchmark to compare normal code with code using Spark
set.seed(88)
tic()
modelALS <- Recommender(getData(eval_sets, "train"), method = "ALS")
train_time <- toc(quiet = TRUE)
# Predicting
tic()
predALS <- predict(modelALS, newdata = getData(eval_sets, "known"), type = "ratings")
predict_time <- toc(quiet = TRUE)
Training <- round(train_time$toc - train_time$tic, 2)
Predicting <- round(predict_time$toc - predict_time$tic, 2)
timing <- data.frame(Method = "recommenderlab", Training = round(train_time$toc - train_time$tic, 2), Predicting = round(predict_time$toc - predict_time$tic, 2))
# Evaulate Accuracy
accALS <- calcPredictionAccuracy(predALS, getData(eval_sets, "unknown"))
accALS
## RMSE MSE MAE
## 0.7871081 0.6195392 0.6329487
Apache Spark ML implements alternating least squares (ALS) for collaborative filtering, a very popular algorithm for making recommendations.
sc <- spark_connect(master = "local")
spark_df <- ratings
train <- sample(x = c(TRUE, FALSE), size = nrow(spark_df),
replace = TRUE, prob = c(0.8, 0.2))
train_df <- spark_df[train, ]
test_df <- spark_df[!train, ]
spark_train <- sdf_copy_to(sc, train_df, "train_ratings", overwrite = TRUE)
spark_test <- sdf_copy_to(sc, test_df, "test_ratings", overwrite = TRUE)
tic()
sparkALS <- ml_als(spark_train, max_iter = 5, nonnegative = TRUE,
rating_col = "rating", user_col = "userId", item_col = "movieId")
train_time <- toc(quiet = TRUE)
tic()
sparkPred <- sparkALS$.jobj %>%
invoke("transform", spark_dataframe(spark_test)) %>%
collect()
predict_time <- toc(quiet = TRUE)
timing <- rbind(timing, data.frame(Method = "Spark", Training = round(train_time$toc -
train_time$tic, 2), Predicting = round(predict_time$toc - predict_time$tic, 2)))
sparkPred <- sparkPred[!is.na(sparkPred$prediction), ] # Remove NaN due to data set splitting
# Calculate error
mseSpark <- mean((sparkPred$rating - sparkPred$prediction)^2)
rmseSpark <- sqrt(mseSpark)
maeSpark <- mean(abs(sparkPred$rating - sparkPred$prediction))
# Disconnect
spark_disconnect(sc)
## NULL
From the RMSE and other values for various mRMSE values we can see ALS calculation with Spark is better than normal execution but they are relatively close in values.
accuracy <- rbind(accALS, data.frame(RMSE = rmseSpark, MSE = mseSpark, MAE = maeSpark))
rownames(accuracy) <- c("recommenderlab ALS", "Spark ALS")
knitr::kable(accuracy, format = "html") %>%
kableExtra::kable_styling(bootstrap_options = c("striped", "hover"))
RMSE | MSE | MAE | |
---|---|---|---|
recommenderlab ALS | 0.7871081 | 0.6195392 | 0.6329487 |
Spark ALS | 0.8782914 | 0.7713958 | 0.6739397 |
We can see difference is performance, it is executed in local Spark but it seems to have a better usage of resources and it predicted faster than the traditional method eventhough training was slower. Spark will distribute the jobs among the processor capabilities doing it faster.
knitr::kable(timing, format = "html", row.names = FALSE) %>%
kableExtra::kable_styling(bootstrap_options = c("striped", "hover"))
Method | Training | Predicting |
---|---|---|
recommenderlab | 0.02 | 5.76 |
Spark | 9.41 | 2.86 |
Even with running just the local instance, Spark improved overall performance. This is clearly the biggest advantage of the distributed processing.
Data is growing faster than processing speeds and the solution is to parallelize on large clusters with wide use in both enterprises and web industry.
Spark works on the cloud with huge number of datapoints and helps to process large data.
Spark Streaming run a computation as a series of very small, deterministic batch jobs.
Data flow engines are becoming an important platform for numerical algorithms
All of the above factors makes distributed computing with Spark neccessary.