The goal of this project is give practice beginning to work with a distributed recommender system. Adapt one of your recommendation systems to work with Apache Spark and compare the performance with your previous iteration.

Introduction

In this project we are using partial code from Project 2 to compare code excecution speed in R, Local Databricks, Community Databricks and Azure Databricks

Function

We begin by creating a function to do the following:

  • Get Data
  • Transform data
  • Build a realMatrix
  • Split Dataset
  • Train/Predict IBC Model
  • Train/Predict UBC Model

Local R execution

We use the system.time to get user, system and elapse times for the execution of the function created. Results are below:

#No Spark
nsst<-system.time(ld())
nsst
##    user  system elapsed 
##   48.18    1.02   56.17

Local Spark Execution

In this section we run the same function in a local databricks connections.

#Local Spark
sparkR.session()
## Launching java with spark-submit command C:\Users\apagan\AppData\Local\Apache\Spark\Cache/spark-2.4.6-bin-hadoop2.7/bin/spark-submit2.cmd   sparkr-shell C:\Users\apagan\AppData\Local\Temp\Rtmpm4phsM\backend_port810034122525
## Java ref type org.apache.spark.sql.SparkSession id 1
Sys.getenv("SPARK_HOME")
## [1] "C:\\Users\\apagan\\AppData\\Local\\Apache\\Spark\\Cache/spark-2.4.6-bin-hadoop2.7"
sc<-spark_connect(master = "local")
Sys.sleep(20)

Here we copy the urate dataset and run the function. Results of execution times are below:

urate<-sparklyr::copy_to(sc, urate, memory=FALSE, overwrite=TRUE)
src_tbls(sc)
## [1] "urate"
lsst<-system.time(ld())
lsst
##    user  system elapsed 
##   62.09    1.48   67.54
spark_disconnect(sc)

The same run in community databricks and azure databricks run 8 seconds and 10 seconds respectively.

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1758651809032548/2988639792147538/2008539467650786/latest.html

Conclusion

These test show that local execution in R or local databricks have similar completion time when using 1 node in a cluster. Additional nodes in a local cluster give different results. In comparison, run the funciton in the community and Azure databricks reduced execution times 4X.

When running any code with large datasets and complex algorithms, a distributed system with multiple nodes in a cluster would be preferred.

APPENDIX

Code used in analysis

knitr::opts_chunk$set(
    echo = FALSE,
    message = FALSE,
    warning = FALSE
)

rm(list=ls())
list.of.packages <- c("alluvial","caret","caret","corrplot","corrplot","data.table","dplyr","faraway","forcats","geosphere","ggplot2","ggplot2","ggplot2","grid","gridExtra","jtools","kableExtra","knitr","leaflet","leaflet.extras","leaps","lubridate","maps","MASS","mice","naniar","pander","patchwork","prettydoc","pROC","psych","RColorBrewer","readr","reshape2","scales","stringr","tibble","tidyr","tidyverse","xgboost","widgetframe","Rcpp","mlbench","fpp2","mlr","jsonlite","devtools","sparklyr","SparkR")
new.packages <- list.of.packages[!(list.of.packages %in% installed.packages()[,"Package"])]
if(length(new.packages)) install.packages(new.packages, dependencies=TRUE)

#knitr::opts_chunk$set(echo = TRUE)
require(knitr)
library(ggplot2)
library(tidyr)
library(MASS)
library(psych)
library(kableExtra)
library(dplyr)
library(faraway)
library(gridExtra)
library(reshape2)
library(leaps)
library(pROC)
library(caret)
library(naniar)
library(pander)
library(pROC)
library(mlbench)
library(e1071)
library(fpp2)
library(mlr)
library(recommenderlab)
library(jsonlite)
library(stringr)
library(devtools)
library(sparklyr)
library(SparkR)
ld<<-function()
{
urate<<- read.csv("https://raw.githubusercontent.com/apag101/Data612/master/Projects/Project2/BX-Book-Ratings.csv", sep=";",header = TRUE)

surate<-subset(urate, Book.Rating !=0 & as.integer(User.ID) <501)

lisbn<-length(unique(surate$ISBN))
luser<-length(unique(surate$User_ID))
lbook<-length(surate$Book.Rating)

#Build  Matrix
set.seed(123)
sdata.mat <- matrix(data=surate$Book.Rating,ncol=length(unique(surate$ISBN)),nrow=length(unique(surate$User.ID)))
rownames(sdata.mat)<-c(paste(unique(surate$User.ID)))
colnames(sdata.mat)<-c(paste(unique(surate$ISBN)))
srdata.mat<-as(sdata.mat, "realRatingMatrix")
srdata.mat

#Split Data
which_train<- recommenderlab::sample(x=c(TRUE, FALSE), size=nrow(srdata.mat), replace=TRUE, prob=c(0.8,0.2))
data_train <- srdata.mat[which_train,]
data_test <- srdata.mat[!which_train,]

#Item Based Collaborative Train
ibc_model<-Recommender(data= data_train, method = "IBCF", parameter = list(k=30))
model_details<-getModel(ibc_model)
n_items_top<-30
image(model_details$sim[1:n_items_top,1:n_items_top], main="Heatmap of the first rows and columns")
col_sums<-colSums(model_details$sim>0)
qplot(col_sums)+stat_bin(binwidth = 1) + ggtitle("Distribution of column count")
which_max<-order(col_sums, decreasing = TRUE)[1:6]
rownames(model_details$sim)[which_max]


#Item Based Collaborative Predict Test
n_recommend <- 6
ibc_predict<- recommenderlab::predict(object=ibc_model, newdata= data_test, n=n_recommend)
ibc_matrix <-sapply(ibc_predict@itemLabels, function(x){
    colnames(srdata.mat)[x]
})
ibc<-ibc_matrix[1:6]


#User based Collaborative filtering
ubc_model<-Recommender(data= data_train, method = "UBCF")
ubc_model
model_details<-getModel(ubc_model)
n_items_top<-30
image(model_details$data[1:n_items_top,1:n_items_top], main="Heatmap of the first rows and columns")
col_sums<-colSums(model_details$data)
qplot(col_sums)+stat_bin(binwidth = 1) + ggtitle("Distribution of column count")
which_max<-order(col_sums, decreasing = TRUE)[1:6]
colnames(model_details$data)[which_max]
model_details$data


#User Based Collaborative Predict Test
n_recommend <- 6
ubc_predict<- recommenderlab::predict(object=ubc_model, newdata= data_test, n=n_recommend)
ubc_matrix <-sapply(ubc_predict@itemLabels, function(x){
    rownames(srdata.mat)[x]
})

ubc<-ubc_matrix[1:6]
  
}
#No Spark
nsst<-system.time(ld())
nsst

#Local Spark
sparkR.session()
Sys.getenv("SPARK_HOME")
sc<-spark_connect(master = "local")
Sys.sleep(20)


urate<-sparklyr::copy_to(sc, urate, memory=FALSE, overwrite=TRUE)
src_tbls(sc)

lsst<-system.time(ld())
lsst

spark_disconnect(sc)
knitr::include_graphics("dbst.PNG")
knitr::include_graphics("azdbst.PNG")