sparklyr
package and runs all Spark code in the local mode. Running Spark locally is challenging, especially in setting the environment and connecting to Spark.# Required libraries
library(recommenderlab) # Matrix/recommender functions
## Warning: package 'recommenderlab' was built under R version 3.6.3
## Loading required package: Matrix
## Loading required package: arules
## Warning: package 'arules' was built under R version 3.6.3
##
## Attaching package: 'arules'
## The following objects are masked from 'package:base':
##
## abbreviate, write
## Loading required package: proxy
## Warning: package 'proxy' was built under R version 3.6.3
##
## Attaching package: 'proxy'
## The following object is masked from 'package:Matrix':
##
## as.matrix
## The following objects are masked from 'package:stats':
##
## as.dist, dist
## The following object is masked from 'package:base':
##
## as.matrix
## Loading required package: registry
## Registered S3 methods overwritten by 'registry':
## method from
## print.registry_field proxy
## print.registry_entry proxy
library(dplyr) # Data manipulation
##
## Attaching package: 'dplyr'
## The following objects are masked from 'package:arules':
##
## intersect, recode, setdiff, setequal, union
## The following objects are masked from 'package:stats':
##
## filter, lag
## The following objects are masked from 'package:base':
##
## intersect, setdiff, setequal, union
library(tidyr) # Data manipulation
## Warning: package 'tidyr' was built under R version 3.6.3
##
## Attaching package: 'tidyr'
## The following objects are masked from 'package:Matrix':
##
## expand, pack, unpack
library(ggplot2) # Plotting
library(tictoc) # Operation timing
library(sparklyr) # Spark processing
## Warning: package 'sparklyr' was built under R version 3.6.3
# Set up data frame for timing
timing <- data.frame(Method=character(), Training=double(), Predicting=double())
# Import data from Github
ratings <- read.csv("https://raw.githubusercontent.com/vijay564/DATA612/master/Project_5/ratings_Short.csv")
# Data prep
ratingsMatrix <- sparseMatrix(as.integer(ratings$UserId), as.integer(ratings$ProductId), x = ratings$Rating)
colnames(ratingsMatrix) <- levels(ratings$ProductId)
rownames(ratingsMatrix) <- levels(ratings$UserId)
amazon <- as(ratingsMatrix, "realRatingMatrix")
# Split data into Test and Train
set.seed(88)
eval <- evaluationScheme(amazon, method = "split", train = 0.8, given = 5, goodRating = 3)
train <- getData(eval, "train")
known <- getData(eval, "known")
unknown <- getData(eval, "unknown")
# Training
tic()
modelALS <- Recommender(train, method = "ALS")
train_time <- toc(quiet = TRUE)
# Predicting
tic()
predALS <- predict(modelALS, newdata = known, type = "ratings")
predict_time <- toc(quiet = TRUE)
timing <- rbind(timing, data.frame(Method = "recommenderlab",
Training = round(train_time$toc - train_time$tic, 2),
Predicting = round(predict_time$toc - predict_time$tic, 2)))
# Accuracy
accALS <- calcPredictionAccuracy(predALS, unknown)
# Connection
java_path <- normalizePath("C:\\Program Files\\Java\\jre1.8.0_251")
Sys.setenv(JAVA_HOME = java_path)
Sys.getenv('JAVA_HOME')
## [1] "C:\\Program Files\\Java\\jre1.8.0_251"
sc <- spark_connect(master = "local")
# Prepare data
spark_df <- ratings
spark_df$UserId <- as.integer(spark_df$UserId)
spark_df$ProductId <- as.integer(spark_df$ProductId)
# Split for training and testing
which_train <- sample(x = c(TRUE, FALSE), size = nrow(spark_df),
replace = TRUE, prob = c(0.8, 0.2))
train_df <- spark_df[which_train, ]
test_df <- spark_df[!which_train, ]
# Move to Spark
spark_train <- sdf_copy_to(sc, train_df, "train_ratings", overwrite = TRUE)
spark_test <- sdf_copy_to(sc, test_df, "test_ratings", overwrite = TRUE)
# Build model
tic()
sparkALS <- ml_als(spark_train, max_iter = 5, nonnegative = TRUE,
rating_col = "Rating", user_col = "UserId", item_col = "ProductId")
train_time <- toc(quiet = TRUE)
# Run prediction
tic()
sparkPred <- sparkALS$.jobj %>%
invoke("transform", spark_dataframe(spark_test)) %>%
collect()
predict_time <- toc(quiet = TRUE)
timing <- rbind(timing, data.frame(Method = "Spark",
Training = round(train_time$toc - train_time$tic, 2),
Predicting = round(predict_time$toc - predict_time$tic, 2)))
sparkPred <- sparkPred[!is.na(sparkPred$prediction), ] # Remove NaN due to data set splitting
# Calculate error
mseSpark <- mean((sparkPred$Rating - sparkPred$prediction)^2)
rmseSpark <- sqrt(mseSpark)
maeSpark <- mean(abs(sparkPred$Rating - sparkPred$prediction))
# Disconnect
spark_disconnect(sc)
accuracy <- rbind(accALS, data.frame(RMSE = rmseSpark, MSE = mseSpark, MAE = maeSpark))
rownames(accuracy) <- c("recommenderlab ALS", "Spark ALS")
knitr::kable(accuracy, format = "html") %>%
kableExtra::kable_styling(bootstrap_options = c("striped", "hover"))
RMSE | MSE | MAE | |
---|---|---|---|
recommenderlab ALS | 1.338125 | 1.790578 | 1.048341 |
Spark ALS | 1.336929 | 1.787379 | 1.030275 |
recommerlab
package it took over 4 minutes to run the prediction on the testing set. Training is noticeably slower with Spark, but still just a few seconds. This evaluation was done on the local instance of Spark which has some limitations. With distributed environment performance can be improved.knitr::kable(timing, format = "html", row.names = FALSE) %>%
kableExtra::kable_styling(bootstrap_options = c("striped", "hover"))
Method | Training | Predicting |
---|---|---|
recommenderlab | 0.0 | 476.44 |
Spark | 7.5 | 2.36 |