Implementing a Recommender System on Spark

Using sparklyr package in R to implement the Recommender System

if (!require('sparklyr')) install.packages('sparklyr')
## Loading required package: sparklyr
if (!require('dplyr')) install.packages('dplyr')
## Loading required package: dplyr
## 
## Attaching package: 'dplyr'
## The following objects are masked from 'package:stats':
## 
##     filter, lag
## The following objects are masked from 'package:base':
## 
##     intersect, setdiff, setequal, union
sparklyr::spark_install(version = "2.1.0")
java_path <- normalizePath('C:/Program Files/Java/jre1.8.0_131')
Sys.setenv(JAVA_HOME=java_path)

sc <- spark_connect(master = "local")
## * Using Spark: 2.1.0
# run out of memory on 24m records on predictions step
# ratings <- read.csv('C:\\Users\\10121760\\Downloads\\ml-latest\\ml-latest\\ratings.csv')
# ratings <- ratings[, 1:3]
# names(ratings) <- c('user', 'item', 'rating')
ratings <- read.csv('C:\\Users\\10121760\\Documents\\cuny\\2017Summer\\ratings.csv')

class(ratings)
## [1] "data.frame"
dim(ratings)
## [1] 100000      3
summary(ratings)
##       user            item            rating    
##  Min.   :  1.0   Min.   :   1.0   Min.   :1.00  
##  1st Qu.:254.0   1st Qu.: 175.0   1st Qu.:3.00  
##  Median :447.0   Median : 322.0   Median :4.00  
##  Mean   :462.5   Mean   : 425.5   Mean   :3.53  
##  3rd Qu.:682.0   3rd Qu.: 631.0   3rd Qu.:4.00  
##  Max.   :943.0   Max.   :1682.0   Max.   :5.00
# copy ratings into spark
ratings_tbl <- sdf_copy_to(sc, ratings, overwrite = TRUE)

# transform our data set, and then partition into 'training', 'test'
partitions <- ratings_tbl %>%
  
  sdf_partition(training = 0.5, test = 0.5, seed = 1099)

model <- ml_als_factorization(partitions$training, rating.column = "rating", 
                              user.column = "user",
                              item.column = "item",
                              iter.max = 5, regularization.parameter = 0.01, 
                              implicit.preferences = TRUE, alpha = 1.0)

summary(model)
##                  Length Class      Mode       
## item.factors     11     data.frame list       
## user.factors     11     data.frame list       
## data              2     spark_jobj environment
## ml.options        7     ml_options list       
## model.parameters  2     -none-     list       
## .call             9     -none-     call       
## .model            2     spark_jobj environment
predictions <- model$.model %>%
  invoke("transform", spark_dataframe(partitions$test)) %>%
  collect()

# due to data selection, there were NA value in prediction
predictions <- predictions[!is.na(predictions$prediction), ]

# RMSE
RMSE <- sqrt(mean(with(predictions, prediction-rating)^2))
RMSE
## [1] 3.267737
spark_disconnect(sc)

Summary and Findings:

  1. It took a few attempts and resources to get Sparklyr to work, including download jre8, and set up home directory.
  2. It’s not straight forward to convert model function to Spark model function, ALS algorithm is one of the few models working in Spark.
  3. Partition test and train set in Spark using sdf_partition().
  4. As there are a few times running out of memory, I used a small dataset. It will take more time to find out the difference in performance w/wo Spark.
  5. Spark is part of RStudio IDE, it helps to explore further in the future.

Reference:

https://rpubs.com/enylander/263194