Overview

The goal of this project is to implement a simple recommender using Spark. We’ll be using sparklyr as our interface to spark because of its itegration with the easy-to-use tidyverse functions.

Load Data

This step is not trivial because Spark cannot read data using http requests. It can only read from a filesystem or S3. There is an example setting up Spark to read from S3 on https://medium.com/ibm-data-science-experience/read-and-write-data-to-and-from-amazon-s3-buckets-in-rstudio-1a0f29c44fa7

library(sparklyr)
library(tidyverse)
library(kableExtra)

conf <- spark_config()
conf$sparklyr.defaultPackages <- c("com.databricks:spark-csv_2.10:1.5.0",
                                   "com.amazonaws:aws-java-sdk-pom:1.10.34",
                                   "org.apache.hadoop:hadoop-aws:2.7.3")

spark_conn <- spark_connect('local', config = conf)
access <- Sys.getenv("AWS_ACCESS_KEY_ID")
secret <-  Sys.getenv("AWS_SECRET_ACCESS_KEY")

prepare_s3 <- function(key, secret){
  ctx <- sparklyr::spark_context(spark_conn)
  
  jsc <- invoke_static( spark_conn, "org.apache.spark.api.java.JavaSparkContext", "fromSparkContext", ctx )
  
  
  jsc <- invoke_static(spark_conn, 
                       "org.apache.spark.api.java.JavaSparkContext", 
                       "fromSparkContext", 
                       ctx)
  hconf <- sparklyr::invoke(jsc, "hadoopConfiguration") 
  hconf %>% sparklyr::invoke("set", "fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
  hconf %>% sparklyr::invoke("set","fs.s3a.access.key", key)
  hconf %>% sparklyr::invoke("set","fs.s3a.secret.key", secret)
}
prepare_s3(access, secret) #easy function for future projects
## NULL
s3_path <- 's3a://data612project5/Project5/ratings_sub.csv'

ratings_data <- spark_read_csv(spark_conn, name = 'ratings', path = s3_path)

print(ratings_data, n = 5, width = Inf)
## # Source: spark<ratings> [?? x 5]
##    `_c0`  user  item rating timestamp
##    <int> <int> <int>  <dbl>     <int>
##  1     0     1     1      4 964982703
##  2     1     1     3      4 964981247
##  3     2     1     6      4 964982224
##  4     3     1    47      5 964983815
##  5     4     1    50      5 964982931
##  6     5     1    70      3 964982400
##  7     6     1   101      5 964980868
##  8     7     1   110      4 964982176
##  9     8     1   151      5 964984041
## 10     9     1   157      5 964984100
## # ... with more rows

Build and Test Recommender

Standard Model

This model will be built with ALS. To test tune our hyperparameters, we’ll take advantage of the gridsearch feature in Spark. Even when run locally, this runs quickly, as we can set the parraelization equal to the number of cores.

estimator <- ml_pipeline(spark_conn) %>%
  ml_als(rating_col = 'rating', user_col = 'user', item_col = 'item', max_iter = 15, cold_start_strategy = 'drop')  


als_grid <- list(als = list(rank = c(15,20), reg_param = c(.05, .1)))
cv <- ml_cross_validator(
  spark_conn, 
  estimator = estimator,
  evaluator = ml_regression_evaluator(spark_conn, label_col = 'rating'), 
  estimator_param_maps = als_grid,
  num_folds = 4,
  parallelism = 4
)

als_cv <- ml_fit(cv, ratings_data)
ml_validation_metrics(als_cv)

Binary Model

We’ll use again ALS to build our recommender and convert the data to binary feedback. The comparison point can be found under the header “ALS Model” at https://github.com/TheFedExpress/DATA612/blob/master/Project%204/Project%204.ipynb

We’ll first do cross validation using the typcial recommender evaluation metrics, precion and recall. Unfortunately, there isn’t a recommender-specific CV splitting function, as there is in the lensKit package in Python. Instead, we’ll simply do manual bootstrapped CV with a loop.

dfs<- list()

  for (i in 1:2){ 
    set.seed(42 + i)
    partitioned_set <- ratings_data %>%
      sdf_random_split(training = .8, testing = .2) 
      
    als_mod <- partitioned_set[[1]] %>%
      ml_als(rating_col = 'rating', user_col = 'user', item_col = 'item', max_iter = 15, implicit_prefs = TRUE, rank = 20, reg_param = .1)
      
    recs <- ml_recommend(als_mod, type = 'item', 10) %>%
      full_join(partitioned_set[[2]], c('user', 'item'), suffix = c('_pred', '_act')) %>%
      mutate(truth_cat = ifelse(is.na(rating_pred) == 1 & is.na(rating_act) == 0, 'FN', '')) %>%
      mutate(truth_cat = ifelse(is.na(rating_pred) == 0 & is.na(rating_act) == 1, 'FP', truth_cat)) %>%
      mutate(truth_cat = ifelse(is.na(rating_pred) == 0 & is.na(rating_act) == 0, 'TP', truth_cat)) %>%
      group_by(truth_cat) %>%
      summarise(tot_obs = n()) %>%
      ungroup() %>%
      collect()
    
    recs_cm <- recs %>%#We have few enough records at this point that bringing local to use the spread function isn't a problem
      spread(truth_cat, tot_obs) %>%
      mutate(
        precision = TP/(TP + FP),
        recall = TP/(TP + FN),
        F1 =  2*((precision*recall)/(precision + recall))
      )
    dfs[[i]] <- recs_cm
  }
  summary_df <- bind_rows(dfs) %>%
    summarise_all(mean)

summary_df %>% kable() %>% kable_styling(bootstrap_options = "striped", full_width = F)
FN FP TP precision recall F1
18634 5450.5 649.5 0.1064754 0.0336832 0.0511763

Note: Because of the manner in which LensKit split the data in the Project 4 notebook, the recall metric isn’t an apples to apples comparison. LensKit put half of the users in each CV fold and tested on 20% of the ratings from those users.

Conclusion

The Spark implementation of ALS runs considerably faster than the versoin in the Python “Implicit” package, though the accuracy is similar. There are definitely costs to doing most of your work in Spark, as some specific functions are not available, such as some cross validation helpers. It might be best practice to do model tuning requiring exotic packages on a subet of your data locally, then do your final and more standard tuning on Spark.

In practice, it would be necessary to use a remote Spark connection when you can no longer fit the contents of all the ALS operations in memory on your local system. Even 100 million row ratings tables fit easily in memory on most modern PCs. It’s the intermediate calculations of the ALS algorithm that hog all the memory.