Implementing recommendation systems to work with Apache Spark.
Jester dataset from recommenderlab.
library(recommenderlab)
library(ggplot2)
library(sparklyr)
data(Jester5k)
Jester5k dataset included in recommenderlab package which has “5000 users from the anonymous ratings data from the Jester Online Joke Recommender System.”
Printing rowcounts summary
## number of ratings per user
summary(rowCounts(Jester5k))
## Min. 1st Qu. Median Mean 3rd Qu. Max.
## 36.00 53.00 72.00 72.42 100.00 100.00
As we can see, each user included has rated at least 36 jokes and maximum 100 jokes.
Printing histogram
hist(getRatings(Jester5k), main="Distribution of ratings")
As we can see rating is range is between -10.00 and 10.00.
Now we are creating training (0.75%) and testing data (0.25%) set.
#Normalize the ratings
Jester5k <- normalize(Jester5k)
train_records <- evaluationScheme(data = Jester5k, method = "split", train = 0.75, given = 25, goodRating = 0.1)
Two algorithms are used for implementing recommender system using Cosine similarity method. - User-based collaborative filtering (UBCF)
start_time<- Sys.time()
#Learns a recommender model from given data
recommender <- Recommender(getData(train_records, "train"), "UBCF", parameter = list(method = "Cosine"))
## Warning in .local(x, ...): x was already normalized by row!
#Creates recommendations using a recommender model and data about new users.
user_cosine_p <- predict(recommender, getData(train_records, "known"), type="ratings")
end_time<- Sys.time()
end_time - start_time
## Time difference of 7.935843 secs
sc <- spark_connect(master = "local")
Now we need to copy R data into Spark using the dplyr copy_to function But our data (train_records) is in S4 format and doing copy_to(sc, train_records, overwrite = T) will not work. so I tried to convert Jester5k into data ( using as.vector and as.data.frame functions ) which copy_to function can understand.
#train_records1<-copy_to(sc, train_records, overwrite = T)
#convert Jester5k into data which copy_to function can understand.
jester <- as.vector(Jester5k@data)
jester <- as.data.frame(jester)
train_records1<-copy_to(sc, jester, overwrite = T)
#train_records<-spark_dataframe(train_records)
Now we need use this data for Recommender but again I got data conversion issues as getData expects object into ratingMatrix format. so below functions are not using spark.
start.time<- Sys.time()
recc_model_u_c <- Recommender(getData(train_records, "train"), "UBCF", parameter = list(method = "Cosine"))
## Warning in .local(x, ...): x was already normalized by row!
user_cosine_p <- predict(recc_model_u_c, getData(train_records, "known"), type="ratings")
as(user_cosine_p, "matrix")[1:5,1:5]
## j1 j2 j3 j4 j5
## u7061 NA -2.4312400 NA NA -3.2566587
## u7299 0.7825866 -1.3181007 NA -0.1914870 0.5690177
## u20231 -0.8381206 NA -2.2555388 -3.1532354 -2.6509175
## u13120 -0.9248686 -0.7193096 -1.0160050 -1.9674776 NA
## u20747 -0.2824516 -0.7298169 0.5412395 -0.1411324 1.1066822
spark_disconnect(sc)
end.time<- Sys.time()
time.taken <- end.time - start.time
time.taken
## Time difference of 7.799139 secs
Original plan was to show that using spark we can show result quickly. I was able to connect spark and was able to copy data there, but couldn’t use it to do prediction becuase of data type conversion issues.