The goal of this project is give practice beginning to work with a distributed recommender system. Adapt one of your recommendation systems to work with Apache Spark and compare the performance with your previous iteration.
In this project we are using partial code from Project 2 to compare code excecution speed in R, Local Databricks, Community Databricks and Azure Databricks
We begin by creating a function to do the following:
We use the system.time to get user, system and elapse times for the execution of the function created. Results are below:
#No Spark
nsst<-system.time(ld())
nsst
## user system elapsed
## 48.18 1.02 56.17
In this section we run the same function in a local databricks connections.
#Local Spark
sparkR.session()
## Launching java with spark-submit command C:\Users\apagan\AppData\Local\Apache\Spark\Cache/spark-2.4.6-bin-hadoop2.7/bin/spark-submit2.cmd sparkr-shell C:\Users\apagan\AppData\Local\Temp\Rtmpm4phsM\backend_port810034122525
## Java ref type org.apache.spark.sql.SparkSession id 1
Sys.getenv("SPARK_HOME")
## [1] "C:\\Users\\apagan\\AppData\\Local\\Apache\\Spark\\Cache/spark-2.4.6-bin-hadoop2.7"
sc<-spark_connect(master = "local")
Sys.sleep(20)
Here we copy the urate dataset and run the function. Results of execution times are below:
urate<-sparklyr::copy_to(sc, urate, memory=FALSE, overwrite=TRUE)
src_tbls(sc)
## [1] "urate"
lsst<-system.time(ld())
lsst
## user system elapsed
## 62.09 1.48 67.54
spark_disconnect(sc)
The same run in community databricks and azure databricks run 8 seconds and 10 seconds respectively.
These test show that local execution in R or local databricks have similar completion time when using 1 node in a cluster. Additional nodes in a local cluster give different results. In comparison, run the funciton in the community and Azure databricks reduced execution times 4X.
When running any code with large datasets and complex algorithms, a distributed system with multiple nodes in a cluster would be preferred.
Code used in analysis
knitr::opts_chunk$set(
echo = FALSE,
message = FALSE,
warning = FALSE
)
rm(list=ls())
list.of.packages <- c("alluvial","caret","caret","corrplot","corrplot","data.table","dplyr","faraway","forcats","geosphere","ggplot2","ggplot2","ggplot2","grid","gridExtra","jtools","kableExtra","knitr","leaflet","leaflet.extras","leaps","lubridate","maps","MASS","mice","naniar","pander","patchwork","prettydoc","pROC","psych","RColorBrewer","readr","reshape2","scales","stringr","tibble","tidyr","tidyverse","xgboost","widgetframe","Rcpp","mlbench","fpp2","mlr","jsonlite","devtools","sparklyr","SparkR")
new.packages <- list.of.packages[!(list.of.packages %in% installed.packages()[,"Package"])]
if(length(new.packages)) install.packages(new.packages, dependencies=TRUE)
#knitr::opts_chunk$set(echo = TRUE)
require(knitr)
library(ggplot2)
library(tidyr)
library(MASS)
library(psych)
library(kableExtra)
library(dplyr)
library(faraway)
library(gridExtra)
library(reshape2)
library(leaps)
library(pROC)
library(caret)
library(naniar)
library(pander)
library(pROC)
library(mlbench)
library(e1071)
library(fpp2)
library(mlr)
library(recommenderlab)
library(jsonlite)
library(stringr)
library(devtools)
library(sparklyr)
library(SparkR)
ld<<-function()
{
urate<<- read.csv("https://raw.githubusercontent.com/apag101/Data612/master/Projects/Project2/BX-Book-Ratings.csv", sep=";",header = TRUE)
surate<-subset(urate, Book.Rating !=0 & as.integer(User.ID) <501)
lisbn<-length(unique(surate$ISBN))
luser<-length(unique(surate$User_ID))
lbook<-length(surate$Book.Rating)
#Build Matrix
set.seed(123)
sdata.mat <- matrix(data=surate$Book.Rating,ncol=length(unique(surate$ISBN)),nrow=length(unique(surate$User.ID)))
rownames(sdata.mat)<-c(paste(unique(surate$User.ID)))
colnames(sdata.mat)<-c(paste(unique(surate$ISBN)))
srdata.mat<-as(sdata.mat, "realRatingMatrix")
srdata.mat
#Split Data
which_train<- recommenderlab::sample(x=c(TRUE, FALSE), size=nrow(srdata.mat), replace=TRUE, prob=c(0.8,0.2))
data_train <- srdata.mat[which_train,]
data_test <- srdata.mat[!which_train,]
#Item Based Collaborative Train
ibc_model<-Recommender(data= data_train, method = "IBCF", parameter = list(k=30))
model_details<-getModel(ibc_model)
n_items_top<-30
image(model_details$sim[1:n_items_top,1:n_items_top], main="Heatmap of the first rows and columns")
col_sums<-colSums(model_details$sim>0)
qplot(col_sums)+stat_bin(binwidth = 1) + ggtitle("Distribution of column count")
which_max<-order(col_sums, decreasing = TRUE)[1:6]
rownames(model_details$sim)[which_max]
#Item Based Collaborative Predict Test
n_recommend <- 6
ibc_predict<- recommenderlab::predict(object=ibc_model, newdata= data_test, n=n_recommend)
ibc_matrix <-sapply(ibc_predict@itemLabels, function(x){
colnames(srdata.mat)[x]
})
ibc<-ibc_matrix[1:6]
#User based Collaborative filtering
ubc_model<-Recommender(data= data_train, method = "UBCF")
ubc_model
model_details<-getModel(ubc_model)
n_items_top<-30
image(model_details$data[1:n_items_top,1:n_items_top], main="Heatmap of the first rows and columns")
col_sums<-colSums(model_details$data)
qplot(col_sums)+stat_bin(binwidth = 1) + ggtitle("Distribution of column count")
which_max<-order(col_sums, decreasing = TRUE)[1:6]
colnames(model_details$data)[which_max]
model_details$data
#User Based Collaborative Predict Test
n_recommend <- 6
ubc_predict<- recommenderlab::predict(object=ubc_model, newdata= data_test, n=n_recommend)
ubc_matrix <-sapply(ubc_predict@itemLabels, function(x){
rownames(srdata.mat)[x]
})
ubc<-ubc_matrix[1:6]
}
#No Spark
nsst<-system.time(ld())
nsst
#Local Spark
sparkR.session()
Sys.getenv("SPARK_HOME")
sc<-spark_connect(master = "local")
Sys.sleep(20)
urate<-sparklyr::copy_to(sc, urate, memory=FALSE, overwrite=TRUE)
src_tbls(sc)
lsst<-system.time(ld())
lsst
spark_disconnect(sc)
knitr::include_graphics("dbst.PNG")
knitr::include_graphics("azdbst.PNG")