Instruction
The goal of this project is give you practice beginning to work with a distributedrecommender system. It is sufficient for this assignment to build out your application on a single node.Adapt one of your recommendation systems to work with Apache Spark and compare the performance with your previous iteration. Consider the efficiency of the system and the added complexity of using Spark. You may complete the assignment using PySpark (Python), SparkR (R) , sparklyr (R), or Scala. Please include in your conclusion: For your given recommender system’s data, algorithm(s), and (envisioned) implementation, at what point would you see moving to a distributed platform such as Spark becoming necessary?
In recent years, recommender systems have become an important part of online platforms. They help users find movies, books, products, and other content based on their preferences. Traditional collaborative filtering methods work well with smaller datasets, but their effectiveness usually drops as the dataset grows. This is where distributed computing platforms like Apache Spark can help.
In this project, we expand the recommender system created in Project
4 by using a distributed environment with Apache Spark. We specifically
employ Spark’s MLlib library and the sparklyr
package in R
to create a matrix factorization model with the Alternating Least
Squares (ALS) algorithm.
The main goal of this project is to see how a distributed computing method can effectively scale collaborative filtering techniques for larger datasets. Although the MovieLens 100k dataset used here is relatively small, it serves as a useful test case for getting familiar with the Spark ecosystem. We will learn how to prepare data, build models, and evaluate performance using distributed methods.
We compare this Spark-based ALS model with earlier models in terms of how complex they are to implement, how well they perform, and how scalable they are. We also note potential benefits and drawbacks of using Spark and discuss when it’s necessary to transition from single-node to distributed environments.
In this project, we use the MovieLens 100k dataset, which contains approximately 100,000 movie ratings from 943 users across 1,664 movies. Ratings range from 1 (worst) to 5 (best) and represent users’ preferences.
In this project, we explore the dataset using a distributed approach by leveraging the ALS matrix factorization method available in Spark’s MLlib, accessed through the sparklyr package running in local mode.
This setup allows us to explore the benefits of scalable recommendation techniques within a Spark-based environment.
We start by loading the data and preparing it for the recommender system. We see that the full ratings matrix includes over 99 thousand ratings, out of a total of 1.57 million user-movie pairs. This implies that the ratings matrix is 94% sparse.
[1] "Toy Story (1995)"
[2] "GoldenEye (1995)"
[3] "Four Rooms (1995)"
[4] "Get Shorty (1995)"
[5] "Copycat (1995)"
[6] "Shanghai Triad (Yao a yao yao dao waipo qiao) (1995)"
[7] "Twelve Monkeys (1995)"
[8] "Babe (1995)"
[9] "Dead Man Walking (1995)"
[10] "Richard III (1995)"
10 x 30 sparse Matrix of class "dgCMatrix"
1 5 3 4 3 3 5 4 1 5 3 2 5 5 5 5 5 3 4 5 4 1 4 4 3 4 3 2 4 1 3
2 4 . . . . . . . . 2 . . 4 4 . . . . 3 . . . . . 4 . . . . .
3 . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
4 . . . . . . . . . . 4 . . . . . . . . . . . . . . . . . . .
5 4 3 . . . . . . . . . . . . . . 4 . . . 3 . . 4 3 . . . 4 .
6 4 . . . . . 2 4 4 . . 4 2 5 3 . . . 4 . 3 3 4 . . . . 2 . .
7 . . . 5 . . 5 5 5 4 3 5 . . . . . . . . . 5 3 . 3 . 4 5 3 .
8 . . . . . . 3 . . . 3 . . . . . . . . . . 5 . . . . . . . .
9 . . . . . 5 4 . . . . . . . . . . . . . . . . . . . . . . .
10 4 . . 4 . . 4 . 4 . 4 5 3 . . 4 . . . . . 5 5 . . . . . . .
In order to use the ALS matrix factorization function in the Apache Spark MLlib package, we need to recast the ratings matrix into a dataframe with numeric (integer) values for user and item IDs. In the process we also save the actual movie names in case we need them later.
# recast ratings matrix into dataframe with numeric user & item IDs
movie_names <- colnames(m)
colnames(m) <- 1:ncol(m)
df <- as(m, "data.frame")
df$user <- as.integer(df$user)
df$item <- as.integer(df$item)
df$rating <- as.integer(df$rating)
str(df)
'data.frame': 99392 obs. of 3 variables:
$ user : int 1 1 1 1 1 1 1 1 1 1 ...
$ item : int 1 2 3 4 5 6 7 8 9 10 ...
$ rating: int 5 3 4 3 3 5 4 1 5 3 ...
user item rating
1 1 1 5
453 1 2 3
584 1 3 4
674 1 4 3
883 1 5 3
969 1 6 5
user item rating
93643 943 1038 2
94227 943 1058 2
94450 943 1065 4
96451 943 1179 3
97147 943 1219 3
98112 943 1320 3
Next we establish a connection to Apache Spark in local mode, and copy the dataframe to a Spark table.
# ---- spark_setup ----
# Set correct Spark version
# spark_home <- spark_install_find(version = "3.1.2", hadoop_version = "3.2")
# Manually set environment variable so sparklyr doesn't look for wrong one
# Sys.setenv(SPARK_HOME = spark_home)
# Sys.setenv(JAVA_HOME = "C:/Program Files/Java/jdk1.8.0_441") # adjust as needed
# Ensure clean logging and connection
# options(sparklyr.log.console = TRUE)
# Connect to Spark using correct version and home
sc <- spark_connect(master = "local", version = "3.1.2")
ratings_tbl <- copy_to(sc, df, "ratings", overwrite = TRUE)
src_tbls(sc)
[1] "ratings"
[1] "tbl_spark" "tbl_sql" "tbl_lazy" "tbl"
Rows: ??
Columns: 3
Database: spark_connection
$ user <int> 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, …
$ item <int> 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, …
$ rating <int> 5, 3, 4, 3, 3, 5, 4, 1, 5, 3, 2, 5, 5, 5, 5, 5, 3, 4, 5, 4, 1, …
Next, we examine the ratings data by plotting the distribution of rating counts and average ratings, both per user and per movie. While this analysis generally runs smoothly in an R script, we encountered occasional stability issues when executing the same code within the RStudio Spark environment—specifically during collect() calls. These errors may stem from integration challenges between RStudio and Apache Spark. Despite this, the code reliably generates the intended histograms when run outside the notebook interface.
# distribution of counts & avg rating per user
by_user <- ratings_tbl %>%
group_by(user) %>%
summarise(count = n(), avg = mean(rating, na.rm = TRUE)) %>%
collect()
head(by_user)
# A tibble: 6 × 3
user count avg
<int> <dbl> <dbl>
1 12 51 4.39
2 13 630 3.10
3 14 98 4.09
4 18 277 3.88
5 38 121 3.72
6 46 27 4.11
ggplot(by_user, aes(count)) + geom_histogram() +
labs(title = "Distribution of Rating Count per User",
x = "Rating Count")
The histogram shows how many ratings each user provided. Most users rated only a small number of movies, while a few users rated many. This imbalance is typical in recommendation datasets and may affect model performance, as users with fewer ratings contribute less information to the learning process.
ggplot(by_user, aes(avg)) + geom_histogram() +
labs(title = "Distribution of Average Rating per User",
x = "Average Rating")
The histogram displays the distribution of average ratings given by users. Most users tend to rate movies positively, with averages clustering around 3 to 4. This indicates a possible bias toward higher ratings, which is common in user-generated rating datasets.
user count avg
Min. : 1.0 Min. : 19.0 Min. :1.497
1st Qu.:236.5 1st Qu.: 32.0 1st Qu.:3.325
Median :472.0 Median : 64.0 Median :3.619
Mean :472.0 Mean :105.4 Mean :3.588
3rd Qu.:707.5 3rd Qu.:147.5 3rd Qu.:3.871
Max. :943.0 Max. :735.0 Max. :4.870
# distribution of counts & avg rating per movie
by_item <- ratings_tbl %>%
group_by(item) %>%
summarise(count = n(), avg = mean(rating, na.rm = TRUE)) %>%
collect()
head(by_item)
# A tibble: 6 × 3
item count avg
<int> <dbl> <dbl>
1 12 267 4.39
2 13 184 3.42
3 14 183 3.97
4 18 10 2.8
5 38 120 3.01
6 46 27 3.56
par(mfrow = c(1,2))
ggplot(by_item, aes(count)) + geom_histogram() +
labs(title = "Distribution of Rating Count per Movie",
x = "Rating Count")
Most movies received relatively few ratings, while a small number were rated frequently. This long-tail distribution is common in recommendation datasets and highlights item popularity imbalance, which can affect recommendation diversity.
ggplot(by_item, aes(avg)) + geom_histogram() +
labs(title = "Distribution of Avg. Rating per Movie",
x = "Average Rating")
Most movies have average ratings between 3 and 4, indicating a general tendency toward moderate to positive feedback. Very few movies receive consistently low or high ratings, suggesting a balanced perception across users
item count avg
Min. : 1.0 Min. : 1.00 Min. :1.000
1st Qu.: 416.8 1st Qu.: 7.00 1st Qu.:2.665
Median : 832.5 Median : 27.00 Median :3.162
Mean : 832.5 Mean : 59.73 Mean :3.077
3rd Qu.:1248.2 3rd Qu.: 80.00 3rd Qu.:3.653
Max. :1664.0 Max. :583.00 Max. :5.000
We will develop a recommender model based on the alternating least squares algorithm (ALS) using the ml_als function in sparklyr. We follow these steps:
Partition the ratings data into training and test sets in an 80/20 split Train the model using the training set partitions\(training Use the model to predict ratings for the test set partitions\)test Compute the model accuracy in terms of the root mean squared error (RMSE) metric. However, we can see below that the ml_als function call to Apache Spark runs into problems, and Spark ends up terminating the process. I tried using the ml_als_factorization function referenced in this week’s class materials, but that function was deprecated in an earlier version of sparklyr. In addition, I logged a new issue on the sparklyr issues page. The developer response was to try installing a different version of the local Spark application; I tried using Spark versions 2.4.3 (the current version), 2.3, 2.2, and 2.1, but unfortunately the ml_als function call didn’t work with any of these versions.
[1] 1
# Reload Spark cleanly
config <- spark_config()
config$spark.driver.memory <- "2G"
config$spark.sql.shuffle.partitions <- "2"
sc <- spark_connect(master = "local", version = "3.1.2", config = config)
# Re-clean: convert to safe numeric integer IDs with no overlap or reuse
df_clean <- df %>%
transmute(
user_id = as.integer(as.factor(user)),
item_id = as.integer(as.factor(item)),
rating = as.integer(rating)
) %>%
filter(!is.na(user_id), !is.na(item_id), !is.na(rating))
# Make sure the Spark table is fresh
if ("ratings_clean" %in% src_tbls(sc)) {
DBI::dbRemoveTable(sc, "ratings_clean")
}
ratings_tbl <- copy_to(sc, df_clean, "ratings_clean", overwrite = TRUE)
# Sample the dataset
ratings_sample <- ratings_tbl %>% sdf_sample(fraction = 0.05, seed = 123)
# Split into train/test
partitions <- ratings_sample %>% sdf_random_split(training = 0.8, test = 0.2, seed = 456)
# Fit ALS model using clean column names
als_model <- ml_als(
x = partitions$training,
rating_col = "rating",
user_col = "user_id",
item_col = "item_id",
rank = 10,
max_iter = 10,
reg_param = 0.1,
nonnegative = TRUE,
cold_start_strategy = "drop"
)
# Evaluate RMSE
estimate_rmse <- function(df){
ml_predict(als_model, df) %>%
mutate(resid = rating - prediction) %>%
summarise(rmse = sqrt(mean(resid ^ 2))) %>%
collect()
}
training_rmse <- estimate_rmse(partitions$training)
test_rmse <- estimate_rmse(partitions$test)
training_rmse
# A tibble: 1 × 1
rmse
<dbl>
1 0.161
# A tibble: 1 × 1
rmse
<dbl>
1 1.32
Next, the plan was to predict ratings for the test set using the ALS factorization model and compute the RMSE, but needless to say, this isn’t feasible now since the ml_als function call failed.
Performance Comparison: recommenderlab vs Spark MLlib
We compare the performance of two recommendation approaches: UBCF from the recommenderlab package and ALS from Spark MLlib. We measure both models using RMSE and training time on the same MovieLens 100k dataset.
# ---- recommenderlab (UBCF) ----
data("MovieLense")
scheme <- evaluationScheme(MovieLense, method = "split", train = 0.8, given = 10, goodRating = 4)
start_time_reco <- Sys.time()
model_reco <- Recommender(getData(scheme, "train"), method = "UBCF")
pred_reco <- predict(model_reco, getData(scheme, "known"), type = "ratings")
end_time_reco <- Sys.time()
rmse_reco <- calcPredictionAccuracy(pred_reco, getData(scheme, "unknown"))["RMSE"]
time_reco <- round(as.numeric(difftime(end_time_reco, start_time_reco, units = "secs")), 2)
# ---- Spark ALS ----
sc <- spark_connect(master = "local", version = "3.1.2")
df <- as(MovieLense, "data.frame") %>%
mutate(
user = as.integer(as.factor(user)),
item = as.integer(as.factor(item)),
rating = as.numeric(rating)
)
ratings_tbl <- copy_to(sc, df, overwrite = TRUE)
splits <- ratings_tbl %>% sdf_random_split(training = 0.8, test = 0.2, seed = 42)
training_tbl <- splits$training
test_tbl <- splits$test
als_model <- ml_als(
training_tbl,
rating_col = "rating",
user_col = "user",
item_col = "item",
cold_start_strategy = "drop"
)
test_tbl_clean <- test_tbl %>%
inner_join(training_tbl %>% select(user) %>% distinct(), by = "user") %>%
inner_join(training_tbl %>% select(item) %>% distinct(), by = "item")
start_time_spark <- Sys.time()
predictions <- ml_predict(als_model, test_tbl_clean)
rmse_spark <- ml_regression_evaluator(predictions, label_col = "rating", prediction_col = "prediction", metric_name = "rmse")
end_time_spark <- Sys.time()
time_spark <- round(as.numeric(difftime(end_time_spark, start_time_spark, units = "secs")), 2)
# ---- Results Summary ----
comparison_df <- data.frame(
Model = c("UBCF (recommenderlab)", "ALS (Spark MLlib)"),
RMSE = c(round(rmse_reco, 4), round(rmse_spark, 4)),
Time_Sec = c(time_reco, time_spark)
)
knitr::kable(comparison_df, caption = "RMSE and Runtime Comparison: recommenderlab vs Spark")
Model | RMSE | Time_Sec | |
---|---|---|---|
RMSE | UBCF (recommenderlab) | 1.2368 | 1.48 |
ALS (Spark MLlib) | 0.9174 | 2.14 |
We evaluated both models using the MovieLens 100K dataset. The
recommenderlab
version used a user-based collaborative
filtering (UBCF) algorithm, while the sparklyr
implementation used the Alternating Least Squares (ALS) matrix
factorization method.
The table below summarizes RMSE and training/prediction time for both models. While UBCF trains quickly and performs reasonably well on small datasets, Spark ALS demonstrates better scalability potential with large-scale data due to its distributed design.
As dataset size and complexity increase—particularly with millions of users and items—a Spark-based recommender system becomes essential to support scalable, parallelized, and production-level recommendation pipelines.
In this project I attempted to develop a recommender system for the MovieLense 100k dataset. The approach was to build a recommender model based on the ALS factorization technique, using the sparklyr package to access the MLlib library on Apache Spark. On the positive side, I was able to load the dataset into a Spark table, do some exploratory analysis, and return the results to RStudio. However, for the most important part, I wasn’t able to build a working model because the ml_als function wasn’t accessible with my environment setup.
Some observations and findings from this project include:
Complexity of new systems: It was clear from this project that learning to use Apache Spark productively will take a substantial investment of time and effort to learn new platforms and applications (e.g., sparklyr, Databricks, MLlib library). This includes becoming familiar with the Spark ecosystem of functions and libraries, learning new syntax and programming logic, reviewing error logs, etc. Stability of new applications: For this project I used a development version of sparklyr (v1.0.1.9004) and a “preview release” of RStudio (v1.2.1555), as suggested in the sparklyr description page. While working with new applications under development may offer cutting-edge capabilities and new techniques, the risk is that stability may suffer as these applications may not be fully stabilized, tested, and integrated with each other. Efficiency in dealing with large datasets: It seems that for large datasets of >10 million ratings, for instance, the advantages of distributed computing should easily outweigh the costs of switching to and learning new application systems. From my last project working with the MovieLense 100k dataset using recommenderlab in RStudio, it was apparent that efficiency limits were close at hand, as some algorithms took on the order of 5-10 minutes to complete. Scaling this up by a factor of 10x (from 100K to 10 million ratings) would imply processing times on the order of 8-16 hours.