The purpose of this project is to implement both a singular value decomposition (SVD) matrix factorization and an alternating least squares (ALS) matrix factorization within the context of a recommender system via the Spark distributed computing platform. Specifically, the SparkR toolset within DataBricks is used to implement and evaluate the prospective algorithms.
The data set to be used is a subset of the Jester Online Joke Recommender System that was used previously for purposes of implementing a singular value decomposition algorithm (see: https://rpubs.com/jt_rpubs/287285). Specifically, a subset containing data from 24,938 users who have rated between 15 and 35 out of 100 possible jokes was downloaded from the following website:
From that site a file named jester-data-3.zip containing a compressed Excel file was downloaded and decompressed. Excel was then used to convert the resulting EXcel file to CSV format. The CSV file was then uploaded to the DataBricks platform for use with this project. The CSV file is read into a SparkR notebook within DataBricks using the SparkR read.df() function. The resulting SparkR data frame is then converted to a traditional R data frame to enable efficient data cleansing operations: Attempts at cleansing the data within a SparkR data frame demonstrated the poor performance of SparkR for such tasks.
The R / SparkR code used to load the data, sample 10,000 random rows from the data, and perform some initial data cleansing steps is shown below.
Please note that the full SparkR / DataBricks code as well as its rendered HTML output can be accessed at the following web link:
# init a sparkR session within DataBricks
sparkR.session()
library(SparkR)
sqlContext <- sparkRSQL.init(sc)
set.seed(1)
# load CSV version of jester subset from Databricks file storage
jester <- read.df(sqlContext, "/FileStore/tables/5zck6d8g1498926897746/jester_data_3-bee06.csv", source = "csv", header="false", inferSchema = "true")
# convert data to a regular R data frame for easier data cleansing
rjest <- take(jester, count(jester))
# reduce matrix to 10,000 rows
trows <- base::sample(nrow(rjest), 10000)
rjest <- rjest[trows,]
# memory cleanup
rm(trows)
# remove first column since it does not contain user ratings
rjest <- rjest[,2:ncol(rjest)]
# set all '99' values to NA since they represent missing data
rjest[,][rjest[,] == 99] <- NAOur data set is comprised of ratings of up to 100 items provided by 10,000 total users, for a total of 1,000,000 possible ratings. However, as we saw in the previous project referenced above, more than 752,000 possible ratings are missing. These missing values must be replaced with reasonable rating values before we attempt to calculate an SVD for our data set. Therefore, following the approach used for deriving a baseline predictor, we compute user and item biases across the entire data set, and then replace any missing values with the sum of the raw mean and the relevant user and item biases. This approach was implemented within SparkR / DataBricks as follows:
# get mean value for entire matrix
raw_mean <- mean(as.vector(as.matrix(rjest)), na.rm = TRUE )
raw_mean
# count number of non-NA's in each row of training set
row_valid <- rowSums(!is.na(rjest[,]))
# count number of non-NA's in each column of training set
col_valid <- colSums(!is.na(rjest[,]))
# calculate user biases
user_biases <- rowMeans(rjest[,] - raw_mean, na.rm = TRUE) / row_valid
# calculate item biases
item_biases <- colMeans(rjest[,] - raw_mean, na.rm = TRUE) / col_valid
# memory cleanup
rm(row_valid, col_valid)
# make a copy of the original matrix
tjest <- rjest
for (i in 1:nrow(tjest)) {
for (j in 1:ncol(tjest)) {
# if the matrix element has an NA, fill in with baseline predictor
if(is.na(tjest[i,j])) {
tjest[i,j] <- raw_mean + user_biases[i] + item_biases[j]
# ensure new values are within valid ratings bounds
if (tjest[i,j] > 10) tjest[i,j] <- 10
if (tjest[i,j] < -10) tjest[i,j] <- -10
} # end if
} # end for j
} # end for iPrior to calculating the SVD we must convert the data frame containing the imputed data to a matrix. We can then identify the rank \(r\) of that matrix and thereby quantify the sizings for the matrices \(U\), \(\Sigma\), and \(V\):
# convert data frame to matrix
rmat3 <- as.matrix(tjest)
# get the rank of the matrix: rank is 100
qr(rmat3)$rankThe rank of the matrix is 100, which is equivalent to the number of columns in the matrix. Given that the matrix itself is \(M = 100,000\) x \(N = 100\), the matrix is full rank and our SVD matrices will be sized as follows:
Matrix \(U\) will be \(M x N\)
Matrix \(\Sigma\) will be \(N x N\)
Matrix \(V\) will be \(N x N\)
The SVD is calculated using R’s svd() function:
# The SVD is calculated using R's svd() function:
j_svd <- svd(rmat3)An analysis of the singular values (shown in the DataBricks URL referenced above) indicates that the first 43 singular values explain approximately 90% of the variability of the matrix containing the imputed values. As such, we truncate each SVD component matrix accordingly:
# Setting $k = 43$ will retain 90% of variability within the imputed values matrix. As such, we truncate matrices $U$, $\Sigma$, and $V$ accordingly:
# load Matrix library - needed for 'Diagonal()' funciton
library(Matrix)
k = length(perc_vec[perc_vec <= .90])
s_k <- Diagonal(x = j_svd$d[1:k])
U_k <- j_svd$u[, 1:k]
V_k <- t(j_svd$v)[1:k, ]The three truncated matrices are then used to calculate a new recommendation matrix which itself will be an approximation of the matrix containing the imputed values. Any predicted ratings values falling outside the valid [-10, 10] ratings range are set equal to the nearest range boundary value:
# Approximating the Matrix of Imputed Values
# The three truncated matrices are then used to calculate a new prediction matrix which itself will be an *approximation* of the matrix containing the imputed values.
predicted <- U_k %*% s_k %*% V_k
# check dimensions to ensure they match original matrix
dim(predicted)
# convert to standard matrix format
pred_mat <- as.matrix(predicted)
# set colnames to match original matrix
colnames(pred_mat) <- colnames(tjest)
rownames(pred_mat) <- rownames(tjest)
# set all vals > 10 to 10 to ensure items are within valid range
pred_mat[,][pred_mat[,] > 10] <- 10
# set all vals < -10 to -10
pred_mat[,][pred_mat[,] < -10] <- -10
# compare the predicted ratings with the contents of the matrix containing the imputed ratings. Summary statistics for these two matrices show that the predicted ratings have a mean and median nearly identical to that of the imputed data:
summary(as.vector(as.matrix(tjest)))
summary(as.vector(pred_mat))We now have three separate sets of ratings data:
The original 10,000 x 100 Jester ratings containing more than 752,000 missing values;
A 10,000 x 100 matrix containing the original non-NA Jester ratings and imputed values for the 752,000 missing values;
A 10,000 x 100 matrix representing the results of multiplying the dimensionality-reduced component matrices we obtained from our SVD efforts. This last matrix is essentially a set of predicted ratings.
We’d like to determine whether or not the matrix generated by the SVD/dimensionality reduction process is a viable proxy for the original Jester ratings.
As a first step in this analysis, we can calculate the root mean square error (RMSE) of the differences between the original Jester data and the predicted ratings we obtained from the SVD/dimensionality reduction process:
# calc RMSE for training set: iterate over entire matrix
err_cnt <- 1
train_errs <- data.frame(matrix(ncol = 1, nrow = nrow(rjest) * ncol(rjest)) )
colnames(train_errs) <- "SE"
# init all items to zero
train_errs$SE <- 0
# train_errs <- data.frame(SE = numeric(0), nrow = nrow(rjest) * ncol(rjest), stringsAsFactors = FALSE)
for (i in 1:nrow(rjest)) {
for (j in 1:ncol(rjest)) {
# if test set element is not null, we need to calc error; if is null, error is zero fpr
# that element
if (!is.na(rjest[i,j])) {
# calculate error for element of test set and square it
sq_err <- (rjest[i,j] - pred_mat[i,j])^2
# add item to data frame
train_errs$SE[err_cnt] <- sq_err
err_cnt <- err_cnt + 1
} # end if
} # end for j
} # end for i
RMSE_train <- sqrt(base::mean(train_errs$SE))
RMSE_trainAs shown in the DataBricks link provided earlier, the RMSE for the SVD model is 0.8477129.
The SparkR toolset provided within DataBricks includes an ALS matrix factorization function. However, that function does not make use of a traditional user-item matrix for purposes of generating the ALS factorization: Instead, it requires that the user-item matrix be converted into a long-format SparkR data frame comprised of one row for each user/item/rating combination. As such, we load R’s tidyr package within the SparkR environment to allow us to make use of its gather() function for purposes of converting the user-item matrix to a long-format data frame:
# check to see whether tidyr is installed. If not, install + load
# install.packages("tidyr")
if(!require(tidyr)){
install.packages("tidyr")
library(tidyr)
}Testing showed that the SparkR ALS function will not accept ‘NA’ values for any rating within the user-item data: If ‘NA’ values are present the function will fail to execute and will generate an error message. Testing also showed that a long-format data frame comprised of only the valid ratings will also fail to produce valid ratings estimates. As such, we’ll make use of the matrix containing the baseline predictor imputations for the ‘NA’ values as the basis of our ALS factorization.
Before converting that matrix to long format, we add a unique user ID to each row. The matrix is then converted to a long-format data frame using the gather() function. Non-numeric components of the resulting item ID’s are then removed via R’s gsub() function:
# add a userID column to matrix containing baseline imputations for NA's
long_rjest <- tjest
long_rjest$userID <- seq(1:nrow(tjest))
# convert user-item matrix to long format
long_df <- gather(long_rjest, ItemID, Rating, -userID)
# remove non-numeric characters from item names + convert result to numeric
long_df$ItemID <- as.numeric(gsub("\\D+", "", long_df$ItemID))
head(long_df)The resulting data frame is then converted to a SparkR data frame and sorted by user ID and item ID via R’s arrange() function:
# create SparkR data frame from cleansed data
als_df <- createDataFrame(long_df)
als_df <- arrange(als_df, "userID", "ItemID")
head(als_df)The SparkR data frame is then split into training and testing subsets via SparkR’s randomSplit() function:
# Split the data into two subsets.
splitData <- randomSplit(als_df, weights = c(0.8, 0.2), seed = 42)
df_train <- splitData[[1]]
df_test <- splitData[[2]]The randomSplit() function proved to be exceedingly slow: Its execution required more than 9 minutes on a DataBricks cluster. Such poor performance likely reflects the overhead associated with implementing the random split within a distributed computing environment.
The ALS model is then generated using SparkR’s spark.als() function and the training subset:
# create Spark R AlS model
model <- spark.als(df_train, "Rating", "userID", "ItemID")
summary(model)Generation of the ALS model also proved to be an exceedingly slow process: SparkR / DataBricks required nearly 10 minutes to produce the model.
Ratings predictions are then generated from the model for the testing subset:
# Make predictions using testing subset
preds <- predict(model, newData=df_test)
head(select(preds, "Rating", "prediction"))Generating the predictions was also an exceedingly slow process, with SparkR / DataBricks requiring 9.5 minutes to produce the ratings estimates.
The accuracy of the ALS model is then evaluated by calculating its RMSE relative to the matrix that includes the imputed baseline predictor values for the ‘NA’ ratings present within the original data. As a first step toward calculating the RMSE, we extract the original ratings and corresponding ratings predictions from the SparkR() object generated by the predict() function and ensure that all predicted ratings values are within the valid [-10, 10] ratings range. The RMSE is then calculated by use of basic vector mathematics:
# move predictions from Spark R data frame to a regular R data frame for RMSE calcs
sr_preds <- select(preds, "Rating", "prediction")
als_preds <- take(sr_preds, count(sr_preds))
rm(sr_preds)
# make sure predictions are within valid range
# set all vals > 10 to 10 to ensure items are within valid range
if (als_preds$prediction > 10) als_preds$prediction <- 10
# set all vals < -10 to -10
if (als_preds$prediction < -10) als_preds$prediction <- -10
# calc RMSE for ALS model
ALS_RMSE <- sqrt(base::mean((als_preds$Rating - als_preds$prediction)^2) )
ALS_RMSEThe SparkR code shown above also proved to be exceedingly slow: This relatively simple set of instructions required 19 minutes of execution time on SparkR / DataBricks to complete.
As can be seen in the DataBricks link provided earlier, the RMSE for the ALS model is 1.962214. However, this result cannot be directly compared to the RMSE calculated earlier for the SVD model: The RMSE calculated for the SVD model did not make use of the imputed values for the ’NA’s contained within the original data set. As such, if we are to make an “apples-to-apples” comparison of the SVD and ALS models, we need to revise our RMSE calculation for the SVD model.
Revising the SVD model’s RMSE calculation to make use of the imputed values for the ‘NA’ ratings is achieved through the use of simple matrix mathematics in R:
# calculate RMSE for SVD model using the imputed NA values as the basis
IMP_SVD_RMSE <- sqrt(base::mean((tjest - pred_mat)^2) )
IMP_SVD_RMSEThe revised RMSE is 0.9123556, which, while slightly higher than the initial SVD RMSE that excluded the ‘NA’ ratings, is still far better than the RMSE found for the SparkR ALS model.
The RMSE scores derived for the SVD and ALS models are summarized in the table below. As we can see, the SVD matrix factorization model clearly outperforms the SparkR ALS model.
| Model | RMSE |
|---|---|
| SVD(w NA’s) | 0.8477129 |
| SVD(No NA’s) | 0.9123556 |
| ALS | 1.962214 |
The SparkR / DataBricks ALS model derivation and evaluation process also suffered from extremely slow performance, which might be reflective of the overhead required for data management within a distributed computing environment. Furthermore, while SparkR attempts to offer software developers a somewhat abstracted approach to the development of algorithms within a distributed computing environment, the function masking resulting from many of SparkR’s function names being identical to those of base R combined with the lack of an IDE serve as a strong caveat to its use.
Given these inefficiencies and SparkR coding/debug challenges, use of a distributed platform for the 1 million item utility matrix used here seems entirely unwarranted. In fact, many of the SparkR commands that performed poorly here could have been easily replaced with standard R functions within an RStudio environment, thereby resulting in a significant improvement in the performance of the code.
However, it may be the case that a much larger user-item matrix might benefit from a distributed platform. Unfortunately, there is no easy way to estimate how much larger the matrix would need to be to obtain such a benefit. Perhaps a “trial-and-error” method, wherein repeatedly larger user-item matrices are tested until we see evidence that a non-distributed platform is unable to complete the required processing, might be appropriate. Future work could include the implementation of such a benchmarking task.