Implementing a recommender in Spark

Methodology: In project 4, I ran CF with 200 users on my own machine. I will try to repeat the same process using a Spark data frame and assess whether 1) the processing is more efficient, 2) the code needs to be adjusted significantly, and 3) the results are different. I will run the recommender using one node, locally. Although both versions are run on the same machine with the same size data set, the way Spark processes data, breaking data down into smaller parallel tasks and loading data in-memory, could mean very different processing times.

Note: I’ve gauged processing time with system.time(). When I knit this document, the times are slightly different from what I references, since all processes must re-run. Times may be different if this document is run locally on a different machine.

library(tidyverse)
## ── Attaching core tidyverse packages ──────────────────────── tidyverse 2.0.0 ──
## ✔ dplyr     1.1.4     ✔ readr     2.1.6
## ✔ forcats   1.0.1     ✔ stringr   1.5.2
## ✔ ggplot2   4.0.1     ✔ tibble    3.3.1
## ✔ lubridate 1.9.4     ✔ tidyr     1.3.1
## ✔ purrr     1.1.0     
## ── Conflicts ────────────────────────────────────────── tidyverse_conflicts() ──
## ✖ dplyr::filter() masks stats::filter()
## ✖ dplyr::lag()    masks stats::lag()
## ℹ Use the conflicted package (<http://conflicted.r-lib.org/>) to force all conflicts to become errors
library(recommenderlab)
## Loading required package: Matrix
## 
## Attaching package: 'Matrix'
## 
## The following objects are masked from 'package:tidyr':
## 
##     expand, pack, unpack
## 
## Loading required package: arules
## 
## Attaching package: 'arules'
## 
## The following object is masked from 'package:dplyr':
## 
##     recode
## 
## The following objects are masked from 'package:base':
## 
##     abbreviate, write
## 
## Loading required package: proxy
## 
## Attaching package: 'proxy'
## 
## The following object is masked from 'package:Matrix':
## 
##     as.matrix
## 
## The following objects are masked from 'package:stats':
## 
##     as.dist, dist
## 
## The following object is masked from 'package:base':
## 
##     as.matrix
library(sparklyr)
## 
## Attaching package: 'sparklyr'
## 
## The following object is masked from 'package:purrr':
## 
##     invoke
## 
## The following object is masked from 'package:stats':
## 
##     filter

Loading data and cleaning

Here, I load the anime data frame from project 4. I can read the CSV in spark and transform, but, because I already have code that makes this DF the way I need it, I will just keep the cleaning from last time and convert to a spark DF with copy_to.

#read CSVs
user_ratings <- read.csv('https://raw.githubusercontent.com/samanthabarbaro/data612_recommender_systems/refs/heads/main/animeratings.csv', na.strings = c("", "NA", "null", "NULL"),  header = TRUE)

item_info <- read.csv('https://raw.githubusercontent.com/samanthabarbaro/data612_recommender_systems/refs/heads/main/anime.csv', na.strings = c("", "NA", "null", "NULL"),  header = TRUE)
user_ratings <- user_ratings |>
  filter(rating > -1)


#remove strangely imported text from item info first
item_info$name <- gsub("&quot;", "", item_info$name)
item_info$name <- gsub("amp;", "", item_info$name)
item_info$name <- gsub("#039;", "", item_info$name)


#join
user_item <- left_join(user_ratings, item_info |>
                         select(anime_id, name), by = "anime_id")

#remove the anime ID
user_item <- user_item |>
  select(user_id, rating, name)

dim(user_item)
## [1] 854113      3
#keep users who rated 15 + shows
user_item_15 <- user_item |>
  group_by(user_id) |>
  filter(n() >= 15) |>
  ungroup()

#we lose about 15k ratings
dim(user_item_15)
## [1] 839357      3
n_distinct(user_item$user_id)
## [1] 9555
#and about 2500 users out of 9555

n_distinct(user_item_15$user_id)
## [1] 6978

Create the item matrix

This data will be in a wide format.

#break up genre rows
anime_rows <- separate_longer_delim(item_info, cols = genre, delim = ",")

anime_rows <- anime_rows |> select(name, genre, type)


item_features <- anime_rows |>
  distinct(name, genre) |>
  mutate(genre_val = 1)


#go wide (again)
item_features_wide <- item_features |>
  mutate(genre = str_trim(genre)) |>   
  distinct(name, genre, genre_val) |>
  pivot_wider(
    names_from = genre,
    values_from = genre_val,
    values_fill = 0
  )

There is a genre called NA for shows that don’t have genres. I will keep it - otherwise these items don’t have any attributes. But I will rename it “none” to avoid confusion.

count(item_features_wide |> dplyr::filter(`NA` == 1))
## # A tibble: 1 × 1
##       n
##   <int>
## 1    62
#rename
item_features_wide <- item_features_wide |>
  rename(None = `NA`)

Connect to Spark

Here, I connect locally to spark version 3.5.

#see available versions
spark_available_versions()
##   spark
## 1   2.4
## 2   3.0
## 3   3.1
## 4   3.2
## 5   3.3
## 6   3.4
## 7   3.5
## 8   4.0
## 9   4.1
#park_install(version = "3.5")


connect_time <- system.time(sc <- spark_connect(master = "local",
                                                version = "3.5"))

sc
## $master
## [1] "local[16]"
## 
## $method
## [1] "shell"
## 
## $app_name
## [1] "sparklyr"
## 
## $config
## $spark.env.SPARK_LOCAL_IP.local
## [1] "127.0.0.1"
## 
## $sparklyr.connect.csv.embedded
## [1] "^1.*"
## 
## $spark.sql.legacy.utcTimestampFunc.enabled
## [1] TRUE
## 
## $sparklyr.connect.cores.local
## [1] 16
## 
## $spark.sql.shuffle.partitions.local
## [1] 16
## 
## $sparklyr.shell.name
## [1] "sparklyr"
## 
## 
## $state
## <environment: 0x0000020222c28820>
## 
## $extensions
## $extensions$jars
## character(0)
## 
## $extensions$packages
## character(0)
## 
## $extensions$initializers
## list()
## 
## $extensions$catalog_jars
## character(0)
## 
## $extensions$repositories
## character(0)
## 
## $extensions$dbplyr_sql_variant
## $extensions$dbplyr_sql_variant$scalar
## list()
## 
## $extensions$dbplyr_sql_variant$aggregate
## list()
## 
## $extensions$dbplyr_sql_variant$window
## list()
## 
## 
## 
## $spark_home
## [1] "C:\\Users\\Sam Barbaro\\AppData\\Local\\spark\\spark-3.5.8-bin-hadoop3"
## 
## $backend
## A connection with                              
## description "->localhost:8882"
## class       "sockconn"        
## mode        "wb"              
## text        "binary"          
## opened      "opened"          
## can read    "yes"             
## can write   "yes"             
## 
## $monitoring
## A connection with                              
## description "->localhost:8882"
## class       "sockconn"        
## mode        "wb"              
## text        "binary"          
## opened      "opened"          
## can read    "yes"             
## can write   "yes"             
## 
## $gateway
## A connection with                              
## description "->localhost:8881"
## class       "sockconn"        
## mode        "rb"              
## text        "binary"          
## opened      "opened"          
## can read    "yes"             
## can write   "yes"             
## 
## $output_file
## [1] "C:\\Users\\SAMBAR~1\\AppData\\Local\\Temp\\RtmpKAHamr\\filee65460e0eb3_spark.log"
## 
## $sessionId
## [1] 86873
## 
## $home_version
## [1] "3.5"
## 
## attr(,"class")
## [1] "spark_connection"       "spark_shell_connection" "DBIConnection"
#check processes
#spark_web(sc)

connect_time
##    user  system elapsed 
##    0.31    0.09   14.69

It takes about 12 seconds to connect to Spark.

Copy the data frame into a spark data frame

For what I will do in Spark, the ratings table needs to be in a long format, unlike a ratings matrix. With CF, I will be joining based on user and item, not an individual user’s set of ratings.

Item features will be in a wide format because getting cosine similarity requires each item as a single row.

Copying the user matrix (name is how it appears in the connections tab):

copy_time <- system.time(spark_anime_user <- copy_to(sc, 
                              user_item_15, 
                              name = "user_item_15", 
                              overwrite = TRUE))

copy_time
##    user  system elapsed 
##    2.06    0.25   18.79

Copying the item matrix:

copy_time2 <- system.time(spark_anime_items <- copy_to(sc, 
                              item_features_wide, 
                              name = "item_features_wide", 
                              overwrite = TRUE))

copy_time2
##    user  system elapsed 
##    1.17    0.11    3.36

Create content-based collaborative filtering using Spark DFs

I will calculate recall/precision@30, RMSE, NDCG to compare this model to my previous CF model.

I will not use the full data set, but the 200-user portion I used on my first iteration so I can compare processing times directly, even if a few other adjustments are made. Using the copy_to command and creating a spark DF means that the data will be split across different partitions and processed in parallel, and Spark’s lazy computing will happen, even only using one node. This had to happen differently (i.e., not using the exact same code) because the R version used matrices to look up similarities, but using spark, which uses dplyr transformations, called for long-format data. So, the goal is the same, but the approach is adjusted.

Calculate cosine similarity:

library(Matrix)

item_features <- spark_anime_items %>% collect()
item_names <- item_features$name
X <- as.matrix(item_features %>% select(-name))
rownames(X) <- item_names

# Cosine similarity via normalized dot product 
#(much faster than proxy::simil at this scale)
norms <- sqrt(rowSums(X^2))
norms[norms == 0] <- 1e-9
X_norm <- X / norms
sim_matrix <- X_norm %*% t(X_norm)
diag(sim_matrix) <- 0  # exclude self-similarity

There are 8k items, so the similarity matrix will be too large for Spark in a long format on one node. I will use the top 50 neighbors per item to make this more efficient. I’m also using system.time to see how long each part of this algorithm takes to run. Also noting that there are functions here that don’t happen in Spark, but I’m wrapping it all in system.time to get an idea of how long everything takes to run. I’ve excluded the cosine similarity calculations for both versions of the CF algorithm from total run time, since none of that happens in Spark here.

#50 neighbors
top_k <- 50

spark_time <- system.time({
  item_sim_long <- lapply(seq_len(nrow(sim_matrix)), function(i) {
    row <- sim_matrix[i, ]
    top_idx <- order(row, decreasing = TRUE)[1:top_k]
    data.frame(
      item_i = item_names[i],
      item_j = item_names[top_idx],
      sim = row[top_idx]
    )
  }) %>% bind_rows() %>% filter(sim > 0)
  
  spark_item_sim <- copy_to(sc, item_sim_long,
                            name = "item_sim", overwrite = TRUE)
})

spark_time
##    user  system elapsed 
##   34.93    3.31   42.96

I create the train/test split. I tried this with all users; Spark ran out of memory and crashed. Here it is with the 200 so I can compare it to the original model.

library(dbplyr)
## 
## Attaching package: 'dbplyr'
## The following objects are masked from 'package:dplyr':
## 
##     ident, sql
set.seed(1122)

spark_time2 <- system.time({
  all_user_ids <- spark_anime_user %>% distinct(user_id) %>% collect() %>% pull(user_id) #200 users
  sample_ids <- sample(all_user_ids, 200)
  
  spark_anime_user_sample <- spark_anime_user %>% filter(user_id %in% sample_ids)
  
  #split the sample
  spark_split <- spark_anime_user_sample %>%
    mutate(rand_val = rand(1122L)) %>%
    group_by(user_id) %>%
    window_order(rand_val) %>%
    mutate(rn = row_number(), n_user = n()) %>%
    ungroup() %>%
    mutate(is_test = rn <= floor(0.2 * n_user))
  
  train_tbl <- spark_split %>% filter(!is_test) %>% select(user_id, name, rating)
  test_tbl  <- spark_split %>% filter(is_test)  %>% select(user_id, name, rating)
})


spark_time2
##    user  system elapsed 
##    1.14    0.28    5.60

Join and score items for users:

spark_time3 <- system.time({
  scored <- train_tbl %>%
    inner_join(spark_item_sim, by = c("name" = "item_i")) %>%
    group_by(user_id, item_j) %>%
    summarise(
      score_num = sum(sim * rating, na.rm = TRUE),
      score_den = sum(abs(sim), na.rm = TRUE),
      .groups = "drop"
    ) %>%
    mutate(score = score_num / (score_den + 1e-9))
  
  #exclude items the user already rated in training set
  scored_unrated <- scored %>%
    anti_join(train_tbl %>% rename(item_j = name), by = c("user_id", "item_j"))
  
  recs <- scored_unrated %>%
    group_by(user_id) %>%
    window_order(desc(score)) %>%
    mutate(rank = row_number()) %>%
    filter(rank <= 30) %>%
    ungroup() %>%
    compute("recs")
})


spark_time3
##    user  system elapsed 
##    4.16    0.70   16.53

Join recommendations with test set, so performance can be checked:

spark_time4 <- system.time({
  eval_tbl <- recs %>%
    select(user_id, item_j, score, rank) %>%
    left_join(
      test_tbl %>% rename(item_j = name, actual_rating = rating),
      by = c("user_id", "item_j")
    )
})

spark_time4
##    user  system elapsed 
##    0.14    0.02    0.41

Calculate performance metrics, including NDCG (not included in processing time, since this is evaluation):

#precision and recall at 30

relevant_counts <- test_tbl %>%
  filter(rating >= 6) %>%
  group_by(user_id) %>%
  summarise(n_relevant = n(), .groups = "drop")

pr_metrics <- eval_tbl %>%
  group_by(user_id) %>%
  summarise(
    n_recs = n(),
    tp = sum(as.integer(!is.na(actual_rating) & actual_rating >= 6), na.rm = TRUE),
    .groups = "drop"
  ) %>%
  left_join(relevant_counts, by = "user_id") %>%
  mutate(
    precision = tp / n_recs,
    recall = ifelse(!is.na(n_relevant) & n_relevant > 0, tp / n_relevant, NA)
  )

pr_summary <- pr_metrics %>%
  summarise(precision_30 = mean(precision, na.rm = TRUE),
            recall_30 = mean(recall, na.rm = TRUE)) %>%
  collect()
#RMSE 
rmse_by_user <- eval_tbl %>%
  filter(!is.na(actual_rating)) %>%
  mutate(sq_err = (actual_rating - score)^2) %>%
  group_by(user_id) %>%
  summarise(user_rmse = sqrt(mean(sq_err, na.rm = TRUE)), .groups = "drop")

rmse_final <- rmse_by_user %>%
  summarise(rmse = mean(user_rmse, na.rm = TRUE)) %>%
  collect()
#NDCG at 30
dcg_by_user <- eval_tbl %>%
  mutate(rel = ifelse(is.na(actual_rating), 0, actual_rating)) %>%
  mutate(dcg_term = rel / log2(rank + 1)) %>%
  group_by(user_id) %>%
  summarise(dcg = sum(dcg_term, na.rm = TRUE), .groups = "drop")

idcg_by_user <- test_tbl %>%
  group_by(user_id) %>%
  window_order(desc(rating)) %>%
  mutate(ideal_rank = row_number()) %>%
  ungroup() %>%
  filter(ideal_rank <= 30) %>%
  mutate(idcg_term = rating / log2(ideal_rank + 1)) %>%
  group_by(user_id) %>%
  summarise(idcg = sum(idcg_term, na.rm = TRUE), .groups = "drop")

ndcg_final <- dcg_by_user %>%
  full_join(idcg_by_user, by = "user_id") %>%
  mutate(
    dcg = ifelse(is.na(dcg), 0, dcg),
    ndcg = ifelse(is.na(idcg) | idcg == 0, NA, dcg / idcg)
  )

ndcg_summary <- ndcg_final %>%
  summarise(ndcg_30 = mean(ndcg, na.rm = TRUE)) %>%
  collect()
cat("Precision@30:", pr_summary$precision_30, "\n")
## Precision@30: 0.007166667
cat("Recall@30:   ", pr_summary$recall_30, "\n")
## Recall@30:    0.01721301
cat("RMSE:        ", rmse_final$rmse, "\n")
## RMSE:         1.034188
cat("NDCG@30:     ", ndcg_summary$ndcg_30, "\n")
## NDCG@30:      0.01644549

I ran this once with a larger test set (1000), and then decided that for a fairer comparison, the sample size woudl have to be the same, so I changed it to 200, which ran much more quickly. The results are all a little better than the original. NDCG (normative discounted cumulative gain) measures how well items are ordered – are the most relevant items at the top of the list? It ranges from 0 to 1. NDCG isn’t great, at about .016, but this is a sparse data set. Also, this is a new metric, so I can’t compare it with the original yet.

Disconnect from Spark

spark_disconnect(sc)

The original algorithm (with some adjustments)

I’ve added NDCG, a measure ranks the quality of ratings by assessing how the algorithm sorts by relevance.

Transforming the data so it’s in the correct format:

 #this will also tell me how long it took to transform the data 
transform_time <- system.time({
  anime_matrix <- pivot_wider(
    data = user_item,
    names_from = name,
    values_from = rating
  ) |>
    #make user_id the row name
    column_to_rownames("user_id") |> 
    as.matrix()   
  
  
  #create real ratings matrix
  real_anime_matrix <- as(anime_matrix, "realRatingMatrix")
  
  #with 15 ratings or more
  real_anime_matrix20 <- real_anime_matrix[rowCounts(
    real_anime_matrix) > 15, ]
  
  
  item_features2 <- item_features_wide
  
  
  #make show names into row names
  item_features2 <- item_features2 |>
    column_to_rownames("name")
})

transform_time
##    user  system elapsed 
##    2.14    0.64    2.68

Finding only common items (items that users in the data set have rated):

common_time <- system.time({
  common_items <- intersect(rownames(item_features2),
                            colnames(real_anime_matrix20))
  
  item_features_aligned <- item_features2[common_items, ]
  
  rating_matrix_aligned <- real_anime_matrix20[, common_items]
})

common_time
##    user  system elapsed 
##    0.00    0.02    0.03

Again, I didn’t include calculating cosine similarity in processing time because I didn’t factor it into the spark times (it’s all done outside of spark).

library(proxy)
item_sim <- as.matrix(simil(as.matrix(item_features2), method = "cosine"))
item_sim_matrix <- as.matrix(item_sim)

local_time2 <- system.time({
  cbf_predict <- function(user_ratings, item_sim, n = 30) {
    rated_idx <- which(!is.na(user_ratings))
    unrated_idx <- which(is.na(user_ratings))
    
    scores <- sapply(unrated_idx, function(i) {
      sims <- item_sim[i, rated_idx]
      rats <- user_ratings[rated_idx]
      sum(sims * rats) / (sum(abs(sims)) + 1e-9)
    })
    
    names(scores) <- names(user_ratings)[unrated_idx]
    sort(scores, decreasing = TRUE)[1:n]
  }

  set.seed(1122)
  rating_m <- as(rating_matrix_aligned, "matrix")
  n_users <- nrow(rating_m)
  test_users <- sample(1:n_users, 200)
})

precision_list <- c()
recall_list <- c()
rmse_list <- c()
ndcg_list <- c()

local_time <- system.time(for (i in test_users) {
  user_ratings <- rating_m[i, ]
  rated_idx <- which(!is.na(user_ratings))
  if (length(rated_idx) < 5) next
  
  test_idx <- sample(rated_idx, max(1, floor(0.2 * length(rated_idx))))
  train_ratings <- user_ratings
  train_ratings[test_idx] <- NA
  
  preds <- cbf_predict(train_ratings, item_sim, n = 30)
  pred_items <- names(preds)
  
  relevant <- names(user_ratings[test_idx][user_ratings[test_idx] >= 6])
  tp <- length(intersect(pred_items, relevant))
  precision_list[i] <- tp / length(pred_items)
  recall_list[i]    <- ifelse(length(relevant) > 0, tp / length(relevant), NA)
  
  common <- intersect(pred_items, names(user_ratings[test_idx]))
  if (length(common) > 0) {
    actual <- user_ratings[test_idx][common]
    predicted <- preds[common]
    rmse_list[i] <- sqrt(mean((actual - predicted)^2))
  }
  
  # NDCG@30
  k <- length(pred_items)
  rel <- ifelse(pred_items %in% names(user_ratings[test_idx]),
                user_ratings[test_idx][pred_items], 0)
  rel[is.na(rel)] <- 0
  
  dcg <- sum(rel / log2(seq_len(k) + 1))
  
  ideal_rel <- sort(user_ratings[test_idx], decreasing = TRUE)
  ideal_rel <- ideal_rel[1:min(k, length(ideal_rel))]
  idcg <- sum(ideal_rel / log2(seq_along(ideal_rel) + 1))
  
  ndcg_list[i] <- ifelse(idcg > 0, dcg / idcg, NA)
})

cat("Precision@30_cf:", mean(precision_list, na.rm = TRUE), "\n")
## Precision@30_cf: 0.002833333
cat("Recall@30_cf:   ", mean(recall_list, na.rm = TRUE), "\n")
## Recall@30_cf:    0.004099047
cat("RMSE_cf:        ", mean(rmse_list, na.rm = TRUE), "\n")
## RMSE_cf:         1.348699
cat("NDCG@30_cf:     ", mean(ndcg_list, na.rm = TRUE), "\n")
## NDCG@30_cf:      0.003492059
local_time
##    user  system elapsed 
##   10.39    1.22   11.20
local_time2
##    user  system elapsed 
##    0.78    0.28    1.10

NDCG is much worse, at .35% compared to the Spark’s version’s ~1.5%, meaning the relevant recommendations do not appear toward the top.

Comparing results

Aggregating results into a table:

comparison <- tibble(
  Metric = c("Precision@30", "Recall@30", "RMSE", "NDCG@30"),
  R_CBF = c(
    mean(precision_list, na.rm = TRUE),
    mean(recall_list, na.rm = TRUE),
    mean(rmse_list, na.rm = TRUE),
    mean(ndcg_list, na.rm = TRUE)
  ),
  Spark_CBF = c(
    pr_summary$precision_30,
    pr_summary$recall_30,
    rmse_final$rmse,
    ndcg_summary$ndcg_30
  )
)

comparison
## # A tibble: 4 × 3
##   Metric         R_CBF Spark_CBF
##   <chr>          <dbl>     <dbl>
## 1 Precision@30 0.00283   0.00717
## 2 Recall@30    0.00410   0.0172 
## 3 RMSE         1.35      1.03   
## 4 NDCG@30      0.00349   0.0164

Creating bar charts to compare different results:

#bar chart

comparison |>
  pivot_longer(c(R_CBF, Spark_CBF), names_to = "Implementation", values_to = "Value") |>
  filter(Metric %in% c("Precision@30", "Recall@30")) |>
  mutate(Implementation = reorder(Implementation, -Value)) |>
  ggplot(aes(x = Metric, y = Value, fill = Implementation)) +
  geom_col(position = "dodge") +
  labs(title = "Precision & Recall at 30 Suggestions: R vs Spark CBF", x = NULL, y = NULL) +
  theme_minimal() +
  theme(axis.text.x = element_text(angle = 30))

comparison |>
  pivot_longer(c(R_CBF, Spark_CBF), names_to = "Implementation", values_to = "Value") |>
  filter(Metric %in% c("RMSE")) |>
  mutate(Implementation = reorder(Implementation, -Value)) |>
  ggplot(aes(x = Metric, y = Value, fill = Implementation)) +
  geom_col(position = "dodge") +
  labs(title = "RMSE: R vs. Spark", x = NULL, y = NULL) +
  theme_minimal() 

comparison |>
  pivot_longer(c(R_CBF, Spark_CBF), names_to = "Implementation", values_to = "Value") |>
  filter(Metric %in% c("NDCG@30")) |>
  mutate(Implementation = reorder(Implementation, -Value)) |>
  ggplot(aes(x = Metric, y = Value, fill = Implementation)) +
  geom_col(position = "dodge") +
  labs(title = "NDCG@30: R vs. Spark", x = NULL, y = NULL) +
  theme_minimal() 

Both versions are doing the similar things with the same sample size. However, in general, the Spark version performs a lot better. RMSE is much lower for Spark; precision and recall@30 are both significantly higher. Spark predicted the ratings more correctly, found a higher proportion of TPs, and identified more relevant positive cases. The original version also does a worse job of choosing the most relevant recommendations. NDCG is at about 1.5% for Spark, and under .5% for R.

There are a number of differences in the way I ran the algorithm that could account for this. I set the same seed, but the sample size of 200 users was chosen differently; they were also split differently in train/test. For efficiency (and so Spark didn’t crash), I also chose to run cosine similarity on 50 neighbors for each item, which could be responsible for the differences-–there was no access to long-tail/low-similarity items, which could be noisy.

Comparing run time

#Adding all the calculated run times together for Spark:
total_spark_time <- spark_time + spark_time2 + spark_time3 + 
  spark_time4 + copy_time + copy_time2

total_spark_time 
##    user  system elapsed 
##   43.60    4.67   87.65
#R was all run in on chunk + the time it took to get the data sets in the
#proper format at the end
local_total_time <- local_time + local_time2 + common_time + transform_time


local_total_time
##    user  system elapsed 
##   13.31    2.16   15.01

I’m comparing total time (user time includes spark processing, since that is done locally). Overall, the Spark algorithm took about 4.5x as long to run (including copying time). However, it still only took a little more than two minutes total.

Conclusion

For your given recommender system’s data, algorithm(s), and (envisioned) implementation, at what point would you see moving to a distributed platform such as Spark becoming necessary?

Using Spark for a small data set

There are upfront costs to using spark: Some things take longer at the beginning. The data set has to be copied. For the current data set, there were a lot of complications to do the same thing with a sample size of only 200 users. Spark crashed and disconnected a few times, which requires running all code again for everything not stored in the environment. I tried running this algorithm with the entire data set on my local drive (single node). Spark crashed and disconnected, which meant I had to start with a fresh connection and run everything over again. This is also basically impossible locally, without Spark, but using the R environment, I could just stop and adjust rather than restarting my session and rerunning all the code requiring a Spark session.

Overall, the processing times are very different by magnitude (4.5x), but neither takes very long (33 seconds vs. a little over two minutes). However, setting Spark up, the disconnects, and crashes caused needless complications for a small data set. I also had to adjust the code, which required troubleshooting.

So, at this scale, Spark is not necessarily worth it. Also noting that I could have sampled the data before copying it to a spark DF, which may have reduced processing time significantly (this would have reduced the number of users and items, because a small set of users will have interacted with fewer items). But this would have made the comparison, where I had to run two different algorithms, even more different.

When does Spark become necessary?

This comparison is run with a sample of 200 users. 200 is not Spark’s limit on this node (I tested and ran the process successfully with 1000 users, and could have easily done more). It does not make sense to use Spark for a small data set due to the upfront costs and overall amount of time. This data set is too small to benefit from Spark’s efficiencies, like lazy evaluation.

However, the upfront cost to using Spark makes sense with larger data sets and more users, such as the full data set (downloadable on Kaggle, contains data from 73,516 users on 12,294 anime). To process the entire data set of ~74k users, Spark and extra computing power would absolutely be necessary. Essentially, implementing anything sophisticated and actually usable on a site with this many users requires a more efficient way to compute and update recommendations. Recommendations would have to be updated regularly for many users, as would items, which requires a more efficient system

The below graphic, taken from Mastering Spark With R, published in 2019,shows the record from that time. This suggests that Spark and appropriate computing power would make short work of a data set this small.

With this data set, a few features of Spark would be particularly useful:

lazy evaluation - I know there were some redundancies in my initial code (like making something wide, then long, then wide again). Spark can assess all transformations that need to be run and figure out how to run them in the most efficient way. Pivoting a few times isn’t a big deal with a data set like this, but could take much longer depending on the size of the data set.

In-memory processing - Spark caches data in memory. For processes like this, where the same data is touched or transformed multiple times, having to revisit that data over and over would increase processing times.

Real-time stream processing - More efficient processing for data sets that are updated (so, not useful for a static data set, but the real-world version of this data set constantly gets new ratings).

Other concerns & notes

Running the data set locally, I was able to calculate cosine similarity for all items. However, I had to limit items’ cosine similarity to 50 neighbors in Spark. I would keep this with a large-scale recommender for efficiency. Limiting k to 50 may have resulted in better, more accurate recommendations (higher precision, recall, NDCG). However, new items and long-tail items might be harder to incorporate with this limitation.

At first I overestimated Spark’s ability to create computing efficiencies on my local machine and attempted to calculate all cosine similarities and to run the algorithm on all users. This caused Spark to crash and disconnect. It also disconnected on its own once.

Sources

Claude Sonnet, 5 [Large language model] Accessed July 2026. Claude.ai

Javier Luraschi, Kevin Kuo & Edgar Ruiz. Mastering Spark With R. 2019. https://therinspark.com/intro.html#intro-spark

John Tringham. Spark Concepts Simplified: Lazy Evaluation. https://medium.com/@john_tringham/spark-concepts-simplified-lazy-evaluation-d398891e0568