The goal of this project is give you practice beginning to work with a distributed recommender system. It is sufficient for this assignment to build out your application on a single node.
Adapt one of your recommendation systems to work with Apache Spark and compare the performance with your previous iteration. Consider the efficiency of the system and the added complexity of using Spark. You may complete the assignment using PySpark (Python), SparkR (R) , sparklyr (R), or Scala.
Please include in your conclusion: For your given recommender system’s data, algorithm(s), and (envisioned) implementation, at what point would you see moving to a distributed platform such as Spark becoming necessary?
You may work on any platform of your choosing, including Databricks Community Edition or in local mode. You are encouraged but not required to work in a small group on this project.
In this assignment, we create two alternating least squares (ALS) recommenders–one with recommenderlab and one with Apache Spark via sparklyr–and compare their performance. Our goal is to gain experience with Spark and begin to assess its advantages over a non-distributed platform.
Our data source is the Jester dataset, which contains data from 24,983 users who have rated 36 or more jokes. Ratings are real values ranging from -10.00 to +10.00 (the value “99” corresponds to “null” = “not rated”). Each row in the dataset represents a different user, and the first column represents the total number of jokes the individual has rated. The remaining 100 columns give the ratings for each joke.
First, we will load in the jester data set. We will remove the column with the number of rated jokes because this will not be used in the recommendation system. Additionally, the raw data represents non-rated jokes as the number 99, so we will replace these values with nulls. Finally, we will subset the data to 5,000 users to speed up computation time.
# Read jester data
jester <- data.frame(read_xls("jester-data-1.xls", col_names = FALSE))
colnames(jester) <- c("ratingCount", seq(100))
row.names(jester) <- 1:nrow(jester)
# Remove num jokes column
ratings <- jester[-1]
# Replace 99 (no rating) with NA
ratings[ratings == 99] <- NA
# Subset to first 5000 users
ratings <- ratings[1:5000,]
# Create user- and item-specific rating counts
jokeCount <- rowSums(!is.na(ratings))
userCount <- colSums(!is.na(ratings))First, let’s take a look at the number of jokes that each user has rated. The dataset has a threshold of users that have rated at least 36 jokes, and it appears that most individuals have rated either around 70 or 100 jokes.
hist(jokeCount,
main = 'Number of Jokes Rated per User',
xlab = 'Number of Jokes Rated',
ylab = 'Number of Users')Next, we can look at the number of ratings that each joke has. We can see that many of the jokes were rated by all 5000 users.
hist(userCount,
main = 'Number of Individuals Rating each Joke',
xlab = 'Number of Users that Rated Joke',
ylab = 'Number of Jokes')Now we can take a look at the ratings across all jokes. The mean rating is a little over 0 and the median rating is 1.5.
## [1] "Mean Rating : 0.890212310680368"
## [1] "Median Rating: 1.5"
A histogram of the ratings shows a left-skew, which means that more jokes are rated positively.
# Histogram of ratings
hist(as.matrix(ratings),
main = "Histogram of ratings",
xlab = 'Rating',
ylab = 'Number of jokes')Finally, we will impute our missing data with the median rating for each joke.
We compare ALS recommenders built using recommenderlab and Spark via sparklyr.
We start with recommenderlab. We split the ratings matrix into train (0.8) and test (0.2) sets; for test users, give 15 joke items and set a good rating as 2 given the mean and median. Next, we train our ALS recommender, setting 10 latent factors and 10 iterations, and use it to predict ratings. Prediction takes a relatively substantial length of time (approximately 5 minutes).
# Defining an evaluation scheme
set.seed(612)
trainPct <- 0.8
toKeep <- 15
ratingThreshold <- 2
# define evaluation scheme
tic("Train - recommenderlab")
e <- evaluationScheme(recclab_ratings,
method = "split",
train = trainPct,
given = toKeep,
goodRating = ratingThreshold)
# training recommender system
recclab_recc <- Recommender(getData(e, "train"),
method = "ALS",
param = list(n_factors = 10,
lambda = 0.1,
n_iterations = 10,
seed = 612))
toc(log = TRUE, quiet = TRUE)
# predicting
start <- Sys.time()
tic("Predict - recommenderlab")
recclab_pred <- predict(recclab_recc, getData(e, "known"), type="ratings")
toc(log = TRUE, quiet = TRUE)
stop <- Sys.time()We can see the prediction error as well as the total time it takes to predict.
## RMSE MSE MAE
## 4.382432 19.205711 3.115437
## Time difference of 6.222422 mins
Next, we move to Spark and sparklyr. We start by restructuring the ratings data to facilitate modeling. Each row in this new dataset will represent a user’s rating for a particular joke.
ratings_df <- as.data.frame(ratings_impute)
ratings_df$user <- 1:nrow(ratings_df)
ratings_df <- ratings_df %>% gather("item", "rating", -user)
ratings_df$item <- as.numeric(ratings_df$item)
ratings_df$rating <- as.numeric(ratings_df$rating)
ratings_df <- arrange(ratings_df, user, item)Our work in Spark starts with establishing a local connection. Here we’ll connect to a local instance of Spark via the spark_connect function:
#sc <- spark_connect(master = "local", spark_home = "C:/Spark/spark-2.4.6-bin-hadoop2.7/tmp/hadoop")
sc <- spark_connect(master = "local")We create a tbl_spark object containing the long ratings dataframe.
Next, we will create an ALS recommender with parameters that mirror those of the model built with recommenderlab. We can log the start and end time for training and prediction.
# train
tic("Train - sparklyr")
sparklyr_recc <- ml_als(sparklyr_ratings,
rating_col = "rating",
user_col = "user",
item_col = "item",
rank = 10,
reg_param = 0.1,
max_iter = 10)
toc(log = TRUE, quiet = TRUE)
# predict
sparkStart <- Sys.time()
tic("Predict - sparklyr")
sparklyr_pred <- ml_predict(sparklyr_recc, sparklyr_ratings)
toc(log = TRUE, quiet = TRUE)
sparkEnd <- Sys.time()We can also look at the calculation errors and total time for prediction.
sparklyr_df <- as.data.frame(sparklyr_pred)
(sparklyr_error <- c(
"RMSE" = RMSE(sparklyr_df$rating, sparklyr_df$prediction),
"MSE" = MSE(sparklyr_df$rating, sparklyr_df$prediction),
"MAE" = MAE(sparklyr_df$rating, sparklyr_df$prediction)
)
)## RMSE MSE MAE
## 3.155573 9.957642 2.253420
## Time difference of 0.1000118 secs
Now that we’ve run models using both recommenderlab and sparklyr, we can compare the performance in terms of computation time. This is easy to do using the tiktok package.
log <- as.data.frame(unlist(tic.log(format = TRUE)))
colnames(log) <- c("Run Time")
knitr::kable(log, format = "html") %>%
kableExtra::kable_styling(bootstrap_options = c("striped", "hover"))| Run Time |
|---|
| Train - recommenderlab: 12.06 sec elapsed |
| Predict - recommenderlab: 373.35 sec elapsed |
| Train - sparklyr: 13.61 sec elapsed |
| Predict - sparklyr: 0.09 sec elapsed |
We can also see the difference in errors:
## RMSE MSE MAE
## sparklyr_error 3.155573 9.957642 2.253420
## recclab_error 4.382432 19.205711 3.115437
At what point would you see moving to a distributed platform such as Spark becoming necessary? Even with this small dataset (5000 users, 100 jokes, 500,000 ratings), we see a drastic improvement by using Spark for the processing. Although the original recommender system using recommenderlab only took about 5 minutes to train, it would be advantageous to speed this up for development purposes. That being said, using Spark for any system that is trained on a dataset of more than 100,000 ratings or so would be of value.
On a more practical sense, using a distributed platform becomes necessary when the computation becomes too large to fit in memory. When we use something like Spark, it can split up the computation into chunks and manage the scheduling so that the computation can be completed without crashing.