The project covers basic Spark set up in the R environment. It relies on sparklyr package and runs all Spark code in the local mode. This is meant to demonstrate basic functionality in preparation for the final project. As such this project can only evaluate some features and benefits of Spark.
# Required libraries
library(recommenderlab) # Matrix/recommender functions
library(dplyr) # Data manipulation
library(tidyr) # Data manipulation
library(ggplot2) # Plotting
library(tictoc) # Operation timing
library(sparklyr) # Spark processing
# Set up data frame for timing
timing <- data.frame(Method=character(), Training=double(), Predicting=double())
The data set is the same as used in my project 4 - a product ratings set for beauty products sold on Amazon.com. It was downloaded from Kaggle.com (https://www.kaggle.com/skillsmuggler/amazon-ratings). Original set contained 2,023,070 observations and 4 variables - User ID, Product ID, Rating (from 1 to 5), and Time Stamp. It was reduced to make it more manageable to 3,562 users, 9,647 products, and 68,565 ratings. This is still a large enough set that various processes may take from a few seconds to a few minutes making it possible to roughly compare performance.
# Data import
ratings <- read.csv(paste0("https://raw.githubusercontent.com/ilyakats/CUNY-DATA643/",
"master/Project%204/ratings_Short.csv"))
So I have something to compare Spark code to, I built a simple ALS model using the recommenderlab package. Although I have not previously built ALS models, the R code is very similar to previous projects. ALS appears to be one of the simplier recommender processes to be set up using Spark.
# Data prep
ratingsMatrix <- sparseMatrix(as.integer(ratings$UserId), as.integer(ratings$ProductId), x = ratings$Rating)
colnames(ratingsMatrix) <- levels(ratings$ProductId)
rownames(ratingsMatrix) <- levels(ratings$UserId)
amazon <- as(ratingsMatrix, "realRatingMatrix")
# Train/test split
set.seed(88)
eval <- evaluationScheme(amazon, method = "split", train = 0.8, given = 5, goodRating = 3)
train <- getData(eval, "train")
known <- getData(eval, "known")
unknown <- getData(eval, "unknown")
# Training
tic()
modelALS <- Recommender(train, method = "ALS")
train_time <- toc(quiet = TRUE)
# Predicting
tic()
predALS <- predict(modelALS, newdata = known, type = "ratings")
predict_time <- toc(quiet = TRUE)
timing <- rbind(timing, data.frame(Method = "recommenderlab",
Training = round(train_time$toc - train_time$tic, 2),
Predicting = round(predict_time$toc - predict_time$tic, 2)))
# Accuracy
accALS <- calcPredictionAccuracy(predALS, unknown)
Similar modeling can be done with Spark. The general process is very simple - set up Spark local instance, copy relevant data frames into Spark, perform modeling and run predictions, compare results. As in previous projects, data is split into training and testing sets (80/20 split) and results are evaluated mostly using RMSE.
# Connection
sc <- spark_connect(master = "local")
# Prepare data
spark_df <- ratings
spark_df$UserId <- as.integer(spark_df$UserId)
spark_df$ProductId <- as.integer(spark_df$ProductId)
# Split for training and testing
which_train <- sample(x = c(TRUE, FALSE), size = nrow(spark_df),
replace = TRUE, prob = c(0.8, 0.2))
train_df <- spark_df[which_train, ]
test_df <- spark_df[!which_train, ]
# Move to Spark
spark_train <- sdf_copy_to(sc, train_df, "train_ratings", overwrite = TRUE)
spark_test <- sdf_copy_to(sc, test_df, "test_ratings", overwrite = TRUE)
# Build model
tic()
sparkALS <- ml_als(spark_train, max_iter = 5, nonnegative = TRUE,
rating_col = "Rating", user_col = "UserId", item_col = "ProductId")
train_time <- toc(quiet = TRUE)
# Run prediction
tic()
sparkPred <- sparkALS$.jobj %>%
invoke("transform", spark_dataframe(spark_test)) %>%
collect()
predict_time <- toc(quiet = TRUE)
timing <- rbind(timing, data.frame(Method = "Spark",
Training = round(train_time$toc - train_time$tic, 2),
Predicting = round(predict_time$toc - predict_time$tic, 2)))
sparkPred <- sparkPred[!is.na(sparkPred$prediction), ] # Remove NaN due to data set splitting
# Calculate error
mseSpark <- mean((sparkPred$Rating - sparkPred$prediction)^2)
rmseSpark <- sqrt(mseSpark)
maeSpark <- mean(abs(sparkPred$Rating - sparkPred$prediction))
# Disconnect
spark_disconnect(sc)
The data was split differently for recommenderlab and Spark models; however, looking at corresonding RMSE values we can get the general idea about the accuracy of two models. The values are very similar which is to be expended if the same method (ALS) is used on the same data. The minor difference is just due to different observartions used for training and different implementation.
accuracy <- rbind(accALS, data.frame(RMSE = rmseSpark, MSE = mseSpark, MAE = maeSpark))
rownames(accuracy) <- c("recommenderlab ALS", "Spark ALS")
knitr::kable(accuracy, format = "html") %>%
kableExtra::kable_styling(bootstrap_options = c("striped", "hover"))
| RMSE | MSE | MAE | |
|---|---|---|---|
| recommenderlab ALS | 1.386013 | 1.921033 | 1.084437 |
| Spark ALS | 1.353287 | 1.831384 | 1.047647 |
The bigger difference is in performance. With recommerlab package it took over 4 minutes to run the prediction on the testing set. Training is noticeably slower with Spark, but still just a few seconds. Since this evaluation was done on the local instance of Spark, it was still subject to local limitations. If work was spread out over multiple nodes, performance can be improved. This is not possible with straight recommenderlab implementation.
knitr::kable(timing, format = "html", row.names = FALSE) %>%
kableExtra::kable_styling(bootstrap_options = c("striped", "hover"))
| Method | Training | Predicting |
|---|---|---|
| recommenderlab | 0.00 | 251.64 |
| Spark | 6.31 | 1.25 |
Even with running just the local instance, Spark improved overall performance. This is clearly the biggest advantage of the distributed processing. The biggest disadvantage is also fairly obvious - more complex implementation. I believe this is the main tradeoff.
With a simple recommender system, implementing Spark will be an overkill - not enough benefit for the effort. My first project was attempting opera recommendations. According to Operabase (http://operabase.com) there are 25,000 performances a year. Since the same production is performed multiple times the number of annual productions is just a few thousand. Fairly low number and can probably be handled by a single server (of course, number of users/ratings should also be considered). On another hand, something like Netflix, a well worn recommender system example, definitely needs very high performance.
Additionally, it is important to consider how often a model should be updated. If a recommender system needs to respond to changes quickly, performance again becomes key and distributed processing is worth the effort.