Using sparklyr package in R to implement the Recommender System
if (!require('sparklyr')) install.packages('sparklyr')
## Loading required package: sparklyr
if (!require('dplyr')) install.packages('dplyr')
## Loading required package: dplyr
##
## Attaching package: 'dplyr'
## The following objects are masked from 'package:stats':
##
## filter, lag
## The following objects are masked from 'package:base':
##
## intersect, setdiff, setequal, union
sparklyr::spark_install(version = "2.1.0")
java_path <- normalizePath('C:/Program Files/Java/jre1.8.0_131')
Sys.setenv(JAVA_HOME=java_path)
sc <- spark_connect(master = "local")
## * Using Spark: 2.1.0
# run out of memory on 24m records on predictions step
# ratings <- read.csv('C:\\Users\\10121760\\Downloads\\ml-latest\\ml-latest\\ratings.csv')
# ratings <- ratings[, 1:3]
# names(ratings) <- c('user', 'item', 'rating')
ratings <- read.csv('C:\\Users\\10121760\\Documents\\cuny\\2017Summer\\ratings.csv')
class(ratings)
## [1] "data.frame"
dim(ratings)
## [1] 100000 3
summary(ratings)
## user item rating
## Min. : 1.0 Min. : 1.0 Min. :1.00
## 1st Qu.:254.0 1st Qu.: 175.0 1st Qu.:3.00
## Median :447.0 Median : 322.0 Median :4.00
## Mean :462.5 Mean : 425.5 Mean :3.53
## 3rd Qu.:682.0 3rd Qu.: 631.0 3rd Qu.:4.00
## Max. :943.0 Max. :1682.0 Max. :5.00
# copy ratings into spark
ratings_tbl <- sdf_copy_to(sc, ratings, overwrite = TRUE)
# transform our data set, and then partition into 'training', 'test'
partitions <- ratings_tbl %>%
sdf_partition(training = 0.5, test = 0.5, seed = 1099)
model <- ml_als_factorization(partitions$training, rating.column = "rating",
user.column = "user",
item.column = "item",
iter.max = 5, regularization.parameter = 0.01,
implicit.preferences = TRUE, alpha = 1.0)
summary(model)
## Length Class Mode
## item.factors 11 data.frame list
## user.factors 11 data.frame list
## data 2 spark_jobj environment
## ml.options 7 ml_options list
## model.parameters 2 -none- list
## .call 9 -none- call
## .model 2 spark_jobj environment
predictions <- model$.model %>%
invoke("transform", spark_dataframe(partitions$test)) %>%
collect()
# due to data selection, there were NA value in prediction
predictions <- predictions[!is.na(predictions$prediction), ]
# RMSE
RMSE <- sqrt(mean(with(predictions, prediction-rating)^2))
RMSE
## [1] 3.267737
spark_disconnect(sc)