Recommender System on Spark
Project Objectives
The goal of this project is give you practice beginning to work with a distributed recommender system. It is sufficient for this assignment to build out your application on a single node.
Adapt one of your recommendation systems to work with Apache Spark and compare the performance with your previous iteration. Consider the efficiency of the system and the added complexity of using Spark. You may complete the assignment using PySpark (Python), SparkR (R) ,sparklyr (R), or Scala.
Please include in your conclusion: For your given recommender system’s data, algorithm(s), and (envisioned) implementation, at what point would you see moving to a distributed platform such as Spark becoming necessary?
You may work on any platform of your choosing, including Databricks Community Edition or in local mode. You are encouraged but not required to work in a small group on this project.
Environment setup.
While researching on installation procedures, my partner and I set up Spark differently:
- Shovan installed standalone Spark, locally on Windows, and set SPARK_HOME in the path. And did the necessary trouble shooting along the way.
- Forhad installed sparklyr, and used spark_install(), to setup the environment. And did the necessary trouble shooting along the way.
Libraries
Load Data
In our examples, we will use the MovieLense dataset; the data is about movies. The table contains the ratings that the users gave to movies. Let’s load the data and take a look.
## user item rating
## 1 1 Toy Story (1995) 5
## 453 1 GoldenEye (1995) 3
## 584 1 Four Rooms (1995) 4
## 674 1 Get Shorty (1995) 3
## 883 1 Copycat (1995) 3
## 969 1 Shanghai Triad (Yao a yao yao dao waipo qiao) (1995) 5
## [1] 943 1664
Each row of MovieLense corresponds to a user, and each column corresponds to a movie. There are more than 943 x 1664 = 1,569,152 combinations between a user and a movie. Therefore, storing the complete matrix would require more than 1,569,152 cells. However, not every user has watched every movie. Therefore, there are fewer than 100,000 ratings, and the matrix is sparse. The recommenderlab package allows us to store it in a compact way.
Explanation of ALS.
In the following code-chunks, we built ALS, using both Recommenderlab and Spark. So, we like to give a brief explanation here. Alternating Least Square (ALS) is another matrix factorization algorithm, which executes in a parallel fashion. ALS is implemented in Apache Spark ML and built for a larges-scale collaborative filtering problems. ALS does a pretty good job at solving scalability and sparseness of the Ratings data, and it’s simple and scales well to very large datasets. Documentaion on ALS is available at: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/libs/ml/als.html#:~:text=Examples-,Description,and%20is%20called%20latent%20factors.
ALS using Recommenderlab.
# split dataset
set.seed(156)
scheme <- MovieLense %>%
evaluationScheme(method = "split", train = 0.8, given = 5, goodRating = 3)
# Training
tic("ALS Model Training- recommenderlab")
als_model <- Recommender(getData(scheme, "train"), method = "ALS")
toc(log = TRUE, quiet = TRUE)
# Predicting
tic("ALS Model Predicting- recommenderlab")
prediction <- predict(als_model, getData(scheme, "known"), type = "ratings")
toc(log = TRUE, quiet = TRUE)
# get accuracy score
evaluation <- calcPredictionAccuracy(prediction, getData(scheme, "unknown"))ALS using Spark.
# convert data based on sparklyr requirements
sdf_MovieLense <- MovieLense %>%
as(. , "data.frame") %>%
mutate(user = as.numeric(user),
item = as.numeric(item))
# connect to spark locally
sc <- spark_connect(master = "local")
# copy data to spark
sdf_rating_matrix <- sdf_copy_to(sc, sdf_MovieLense, "sdf_rating_matrix", overwrite = TRUE)
# split dataset in spark
partitioned <- sdf_rating_matrix %>%
sdf_random_split(training = 0.8, testing = 0.2)
# Training
tic("ALS Model Training- sparklyr")
sdf_als_model <- ml_als(partitioned$training, max_iter = 5)
toc(log = TRUE, quiet = TRUE)
# predicting
tic("ALS Model predicting- sparklyr")
prediction <- ml_transform(sdf_als_model, partitioned$testing) %>% collect()
toc(log = TRUE, quiet = TRUE)Please note that in the following code-chunk, we are disconnecting from Spark because we don’t need to remain connected from this point onwards. Furthermore, it’s a good practice to disconnnect session, when not needed, because the cloud resource (e.g. in AWS or GCP) might be priced, where every second is worth a penny.
Calculating and comparing accuracies.
# function to calculate RMSE
rmse <- function(o, p) {
round((sqrt(mean((o - p)^2, na.rm = TRUE))), 2)
}
# rmse for both models
rmse1 <- evaluation[[1]]
rmse2 <- rmse(prediction$rating, prediction$prediction)
# print the score
kable(cbind(rmse1, rmse2), col.names = c("recommenderlab", "sparklyr")) %>%
kable_styling("striped", full_width = F) %>%
add_header_above(c("RMSE" = 2))| recommenderlab | sparklyr |
|---|---|
| 1.07954 | 0.94 |
We arrived at a lower and better RMSE, using Spark.
Runtime comparison.
# Run time comparison
log <- as.data.frame(unlist(tic.log(format = TRUE)))
colnames(log) <- c("Run Time")
knitr::kable(log, format = "html") %>%
kableExtra::kable_styling(bootstrap_options = c("striped", "hover"))| Run Time |
|---|
| ALS Model Training- recommenderlab: 0.03 sec elapsed |
| ALS Model Predicting- recommenderlab: 34.47 sec elapsed |
| ALS Model Training- sparklyr: 4.62 sec elapsed |
| ALS Model predicting- sparklyr: 2.07 sec elapsed |
Conclusion.
- We arrived at a lower and better RMSE, using Spark.
- While training performs better on Recommenderlab, prediction performs much better on Spark.
- We observed before that ALS is a matrix factorization algorithm, which executes in a parallel fashion. In this kind of scenario, I would expect several tasks to process several shards of the original data. This requires some sort of distributed system, like Hadoop, Spark etc. So, if there is ALS processing, then I would choose distributed processing.
Marker: 612-05_p