Goal

Implementing recommendation systems to work with Apache Spark.

Data

Jester dataset from recommenderlab.

Import required libraries and data

library(recommenderlab)
library(ggplot2)
library(sparklyr)
data(Jester5k) 

About data

Jester5k dataset included in recommenderlab package which has “5000 users from the anonymous ratings data from the Jester Online Joke Recommender System.”

Printing rowcounts summary

## number of ratings per user
summary(rowCounts(Jester5k)) 
##    Min. 1st Qu.  Median    Mean 3rd Qu.    Max. 
##   36.00   53.00   72.00   72.42  100.00  100.00

As we can see, each user included has rated at least 36 jokes and maximum 100 jokes.

Printing histogram

hist(getRatings(Jester5k), main="Distribution of ratings")

As we can see rating is range is between -10.00 and 10.00.

Training and Testing Data

Now we are creating training (0.75%) and testing data (0.25%) set.

#Normalize the ratings
Jester5k <- normalize(Jester5k)
train_records <- evaluationScheme(data = Jester5k, method = "split", train = 0.75, given = 25, goodRating = 0.1)

Predict Recommendations (without spark)

Two algorithms are used for implementing recommender system using Cosine similarity method. - User-based collaborative filtering (UBCF)

start_time<- Sys.time()
#Learns a recommender model from given data
recommender <- Recommender(getData(train_records, "train"), "UBCF", parameter = list(method = "Cosine"))
## Warning in .local(x, ...): x was already normalized by row!
#Creates recommendations using a recommender model and data about new users.
user_cosine_p <- predict(recommender, getData(train_records, "known"), type="ratings")


end_time<-  Sys.time()
end_time - start_time
## Time difference of 7.935843 secs

Predict Recommendations (with spark)

Connecting to Spark

sc <- spark_connect(master = "local")

Now we need to copy R data into Spark using the dplyr copy_to function But our data (train_records) is in S4 format and doing copy_to(sc, train_records, overwrite = T) will not work. so I tried to convert Jester5k into data ( using as.vector and as.data.frame functions ) which copy_to function can understand.

#train_records1<-copy_to(sc, train_records, overwrite = T)

#convert Jester5k into data which copy_to function can understand.
jester <- as.vector(Jester5k@data)
jester <- as.data.frame(jester)
train_records1<-copy_to(sc, jester, overwrite = T)
#train_records<-spark_dataframe(train_records)

Now we need use this data for Recommender but again I got data conversion issues as getData expects object into ratingMatrix format. so below functions are not using spark.

start.time<- Sys.time()
recc_model_u_c <- Recommender(getData(train_records, "train"), "UBCF", parameter = list(method = "Cosine"))
## Warning in .local(x, ...): x was already normalized by row!
user_cosine_p <- predict(recc_model_u_c, getData(train_records, "known"), type="ratings")
as(user_cosine_p, "matrix")[1:5,1:5]
##                j1         j2         j3         j4         j5
## u7061          NA -2.4312400         NA         NA -3.2566587
## u7299   0.7825866 -1.3181007         NA -0.1914870  0.5690177
## u20231 -0.8381206         NA -2.2555388 -3.1532354 -2.6509175
## u13120 -0.9248686 -0.7193096 -1.0160050 -1.9674776         NA
## u20747 -0.2824516 -0.7298169  0.5412395 -0.1411324  1.1066822
spark_disconnect(sc)


end.time<-  Sys.time()
time.taken <- end.time - start.time
time.taken
## Time difference of 7.799139 secs

Conclusion

Original plan was to show that using spark we can show result quickly. I was able to connect spark and was able to copy data there, but couldn’t use it to do prediction becuase of data type conversion issues.