Objective :-

The goal of this project is give you practice beginning to work with a distributed recommender system.It is sufficient for this assignment to build out your application on a single node.

Please include in your conclusion: For your given recommender system’s data, algorithm(s), and (envisioned) implementation, at what point would you see moving to a distributed platform such as Spark becoming necessary?

Solution:-

We took this dataset ml-latest-small.zip from Movie Lens site which describes 5-star rating and free-text tagging activity from MovieLens, a movie recommendation service. It contains 100836 ratings and 3683 tag applications across 9742 movies. These data were created by 610 users between March 29, 1996 and September 24, 2018. This dataset was generated on September 26, 2018.

Citation :- F. Maxwell Harper and Joseph A. Konstan. 2015. The MovieLens Datasets: History and Context. ACM Transactions on Interactive Intelligent Systems (TiiS) 5, 4: 19:1-19:19. https://doi.org/10.1145/2827872

Libraries used

recommenderlab
dplyr
reshape2
kableExtra
tictoc
sparklyr

Data loading , preparation of relevant dataset

Data is loaded from the github, and then selecting the columns to create a matrix which is a class of realRatingMatrix. As our matrix doesn’t have any NA that means every user has seen every movie and provided ratings but all of them may not be relevant.

ratings <- read.csv("https://raw.githubusercontent.com/samriti0202/DATA612-RecommenderSystems/master/Project2/ratings.csv")

titles <- read.csv("https://raw.githubusercontent.com/samriti0202/DATA612-RecommenderSystems/master/Project2/movies.csv")

ratings <- ratings %>% select(userId, movieId, rating) 

#converting the ratings data frame into userId-movieId matrix 
ratingDT <- acast(ratings, userId~movieId, value.var="rating")

#convert matrix into realRatingMatrix using recommenderLab package
ratingDT <- as(as.matrix(ratingDT), "realRatingMatrix")
dim(ratingDT)
## [1]  610 9724

Data preparation

  1. Select the relevant data
  2. Normalize the data

As rule of thumb for beginning user who rating more than 100 movies and movies which have been watched more than 100 time. those are the ones we going to take initially.

ratings_movies <- ratingDT[rowCounts(ratingDT)>100, colCounts(ratingDT)>100]

dim(ratings_movies)
## [1] 245 134

Now the dataset has reduced but still it is a large dataset may be we might have to take a smaller dataset for ALS evluation.Lets first do the evaulation using ALS algorithms and compare recommendation systems to work with Apache Spark.

Building the Item-based Collaborative Filtering Model (IBCF) and RMSE for IBCF model.

Taking a subset of the relevant dataset ,as the memory imprint was too high and iyt was taking time to build the recommender model.

rating_movies <- as(ratings_movies, "realRatingMatrix")
rm()
set.seed(88)
eval_sets <- evaluationScheme(data = rating_movies, method = "split", train = 0.8, given = -1, goodRating = 3,k=1)

#IBCF
eval_recommender_ibcf <- Recommender(data = getData(eval_sets, "train"), method = "IBCF", parameter = NULL)
eval_prediction_ibcf <- predict(object = eval_recommender_ibcf, newdata = getData(eval_sets, "known"), n = 10, type = "ratings")
calcPredictionAccuracy(x = eval_prediction_ibcf, data = getData(eval_sets, "unknown"), byUser = FALSE)
##      RMSE       MSE       MAE 
## 0.8860376 0.7850626 0.6901797

Buidling ALS Model

We will be using Alternating Least Square model as benchmark to compare normal code with code using Spark

set.seed(88)

tic()
modelALS <- Recommender(getData(eval_sets, "train"), method = "ALS")
train_time <- toc(quiet = TRUE)

# Predicting
tic()
predALS <- predict(modelALS, newdata = getData(eval_sets, "known"), type = "ratings")
predict_time <- toc(quiet = TRUE)

Training <- round(train_time$toc - train_time$tic, 2)
Predicting <- round(predict_time$toc - predict_time$tic, 2)



timing <-  data.frame(Method = "recommenderlab", Training = round(train_time$toc - train_time$tic, 2), Predicting = round(predict_time$toc - predict_time$tic, 2))

# Evaulate Accuracy
accALS <- calcPredictionAccuracy(predALS, getData(eval_sets, "unknown"))

accALS
##      RMSE       MSE       MAE 
## 0.7871081 0.6195392 0.6329487

Building Alternating Least Squares (ALS) using Spark ML

Apache Spark ML implements alternating least squares (ALS) for collaborative filtering, a very popular algorithm for making recommendations.

sc <- spark_connect(master = "local")
spark_df <- ratings


train <- sample(x = c(TRUE, FALSE), size = nrow(spark_df),
                      replace = TRUE, prob = c(0.8, 0.2))

train_df <- spark_df[train, ]

test_df <- spark_df[!train, ]

spark_train <- sdf_copy_to(sc, train_df, "train_ratings", overwrite = TRUE)

spark_test <- sdf_copy_to(sc, test_df, "test_ratings", overwrite = TRUE)

tic()



sparkALS <- ml_als(spark_train, max_iter = 5, nonnegative = TRUE, 
                   rating_col = "rating", user_col = "userId", item_col = "movieId")



train_time <- toc(quiet = TRUE)

tic()

sparkPred <- sparkALS$.jobj %>%
  invoke("transform", spark_dataframe(spark_test)) %>%
  collect()

predict_time <- toc(quiet = TRUE)

timing <- rbind(timing, data.frame(Method = "Spark", Training = round(train_time$toc - 
                                                                        train_time$tic, 2), Predicting = round(predict_time$toc - predict_time$tic, 2)))

sparkPred <- sparkPred[!is.na(sparkPred$prediction), ] # Remove NaN due to data set splitting

# Calculate error
mseSpark <- mean((sparkPred$rating - sparkPred$prediction)^2)

rmseSpark <- sqrt(mseSpark)

maeSpark <- mean(abs(sparkPred$rating - sparkPred$prediction))

# Disconnect
spark_disconnect(sc)
## NULL

Analysis

From the RMSE and other values for various mRMSE values we can see ALS calculation with Spark is better than normal execution but they are relatively close in values.

accuracy <- rbind(accALS, data.frame(RMSE = rmseSpark, MSE = mseSpark, MAE = maeSpark))

rownames(accuracy) <- c("recommenderlab ALS", "Spark ALS")

knitr::kable(accuracy, format = "html") %>%
  kableExtra::kable_styling(bootstrap_options = c("striped", "hover"))
RMSE MSE MAE
recommenderlab ALS 0.7871081 0.6195392 0.6329487
Spark ALS 0.8782914 0.7713958 0.6739397

We can see difference is performance, it is executed in local Spark but it seems to have a better usage of resources and it predicted faster than the traditional method eventhough training was slower. Spark will distribute the jobs among the processor capabilities doing it faster.

knitr::kable(timing, format = "html", row.names = FALSE) %>%
  kableExtra::kable_styling(bootstrap_options = c("striped", "hover"))
Method Training Predicting
recommenderlab 0.02 5.76
Spark 9.41 2.86

Summary

Even with running just the local instance, Spark improved overall performance. This is clearly the biggest advantage of the distributed processing.

Data is growing faster than processing speeds and the solution is to parallelize on large clusters with wide use in both enterprises and web industry.

Spark works on the cloud with huge number of datapoints and helps to process large data.

Spark Streaming run a computation as a series of very small, deterministic batch jobs.

Data flow engines are becoming an important platform for numerical algorithms

All of the above factors makes distributed computing with Spark neccessary.