library(tidyverse)
library(recommenderlab)
Project5
Project 5
Project 5 requires implementing a recommendation system via Apache Spark in order to evaluate if there is an improvement in computational efficiency.
Apache Spark
Apache Spark is an open-source system for utilizing clustering in machine learning to efficiently handle large data sets. Spark distributes workflow processes across clusters of computers (known as parallelism) which results in performance efficiency when compared to running process sequentially on one core. The key to Spark’s efficiency is that Spark processes and keeps data in memory (as opposed to reading/writing from/to disk).
Spark works by utilizing the Spark Driver, which is the primary node that controls the cluster manager. In simpler terms, the Spark Driver is responsible for coordinating among all of the clusters that are used to store and process data. When working with data frames, Spark partitions the data frames across different clusters (known as resilient distributed data sets, or RDDs) in parallel and users are able to perform transformations and actions on the data. Spark uses directed acyclic graphs (DAGs) to schedule tasks and assign work across nodes to improve efficiency and execution by reducing unnecessary data shuffling.1
For this assignment, I will work on the Jester5k data set, which I previously worked with for Project 4
data("Jester5k")
Recommenderlab
First, I will create a recommendation system via the recommenderlab package in R, using the Alternating Least Squares (ALS) approach. ALS seeks to approximate the ratings matrix as a product of two smaller matrices, U and P. The optimal values are determined by holding one matrix (U) constant and finding values for the other matrix (P) that minimize the squared errors between the approximated matrix and the original matrix. Then, the matrix P is held constant at the optimal values and matrix U is optimized for values that minimize the squared errors. These matrices have rows corresponding to the users/items and columns corresponding to latent factors, which can be thought of as concepts (e.g. for movies a latent factor might represent a genre while another latent factor may capture the budget of a movie). Accordingly, these matrices capture how users and items rate on these latent factors and finding the dot product can approximate or predict how a user would rate an item. ALS works well with parallelization as the system computes the U and P matrices independently.2
Below I create the recommender via recommenderlab. I start with an evaluation scheme that splits the data into train and test sets.
set.seed(123)
<- evaluationScheme(Jester5k, method = 'split',
eval_scheme train = 0.8, given = 10, goodRating=0)
I capture the train, test, and evaluation sets as data frames that I will later use in Spark.
<- getData(eval_scheme, 'train')
train_set <- as(train_set, 'data.frame')
train_set_df
<- getData(eval_scheme, 'known')
test_set <- as(test_set, 'data.frame')
test_set_df
<- getData(eval_scheme, 'unknown')
eval_set <- as(eval_set, 'data.frame') eval_set_df
Below are the parameters for the ALS recommender:
$get_entry("ALS", dataType = "realRatingMatrix") recommenderRegistry
Recommender method: ALS for realRatingMatrix Description: Recommender
for explicit ratings based on latent factors, calculated by
alternating least squares algorithm. Reference: Yunhong Zhou, Dennis
Wilkinson, Robert Schreiber, Rong Pan (2008). Large-Scale Parallel
Collaborative Filtering for the Netflix Prize, 4th Int'l Conf.
Algorithmic Aspects in Information and Management, LNCS 5034.
Parameters:
normalize lambda n_factors n_iterations min_item_nr seed
1 NULL 0.1 10 10 1 NULL
Lambda is a penalty parameter that is used to prevent overfitting. n_factors specifies how many latent factors to include in the matrices U and P (corresponding to the number of columns in U and rows in P). Similar to SVD, a larger number of latent factors corresponds to a closer approximation of the original matrix; however, latent factors with low explanatory power can be dropped while still having a close representation of the original matrix. n_iterations corresponds to the number of times the algorithm will optimize the underlying matrices for a reduction in squared errors. Lastly, min_item_nr is the minimum number of interactions an item must have before using its factor for prediction (otherwise, the global average is used).
I will start with the default values and below I capture the run time.
<- Sys.time()
start_time
<- Recommender(getData(eval_scheme, 'train'),'ALS',
als_recommenderlab parameter=list(normalize='center',
n_factors = 10,
min_item_nr = 1,
seed=123))
<- Sys.time()
end_time
<- end_time - start_time als_recommenderlab_run_time
als_recommenderlab_run_time
Time difference of 0.003522158 secs
Next, I predict the ratings in the test set and capture the run time.
<- Sys.time()
start_time
<- predict(als_recommenderlab, getData(eval_scheme, 'known'), type = 'ratings')
als_recommernderlab_predictions
<- Sys.time()
end_time
<- end_time - start_time als_recommenderlab_predict_run_time
als_recommenderlab_predict_run_time
Time difference of 1.129575 mins
Below are the accuracy metrics
<- calcPredictionAccuracy(als_recommernderlab_predictions, getData(eval_scheme,'unknown')) als_recommenderlab_metrics
als_recommenderlab_metrics
RMSE MSE MAE
5.838981 34.093694 4.665374
Spark
Next, I will implement the ALS recommender via Spark, which has built in machine learning algorithms including ALS for recommendations.3 For this assignment, I will connect to Spark in local mode, which parallelizes across the cores available on the local machine as opposed to parallelizing across multiple servers.
library(sparklyr)
<- spark_connect(master='local') sc
Next, I copy the required data frames into Spark. Spark’s framework for recommendation algorithms works differently than recommenderlab in that there is no known set (used in recommenderlab for the algorithm to learn about a user preferences in the test set for a given amount of items) and unknown evaluation set (where recommenderlab predicts ratings or recommendations and compares to actual values). Thus, I will combine the train and test set from recommenderlab and the “unknown” evaluation set will be the test set.
<- rbind(train_set_df, test_set_df) train_set_spark
<- copy_to(sc, train_set_spark, 'train_set', overwrite = TRUE)
train_set_tbl <- copy_to(sc, eval_set_df, 'eval_set', overwrite = TRUE) eval_set_tbl
Spark requires numeric values for user and item IDs, which I create below.
<- train_set_tbl |> ft_string_indexer(input_col = 'user',output_col = 'user_id')
train_set_tbl
<- train_set_tbl |> ft_string_indexer(input_col = 'item',output_col = 'item_id') train_set_tbl
Next, I match the IDs from the train set user and items to the test set user and items:
<- train_set_tbl |> select(user, user_id) |> distinct()
user_ids <- train_set_tbl |> select(item, item_id) |> distinct() item_ids
<- eval_set_tbl |> left_join(user_ids, by ='user') |>
eval_set_tbl left_join(item_ids, by ='item')
Next, I fit the ALS model on the training data. I use the same parameters for number of factors and the cost parameter.
<- Sys.time()
start_time
<- ml_als(
als_model
train_set_tbl,rating_col = "rating",
user_col = "user_id",
item_col = "item_id",
rank = 10,
reg_param = 0.1
)
<- Sys.time()
end_time
<- end_time - start_time als_spark_run_time
I predict the ratings for the test set
<- Sys.time()
start_time
<- ml_predict(als_model, eval_set_tbl,
spark_als_predictions rating_col = "rating",
user_col = "user_id",
item_col = "item_id")
<- Sys.time()
end_time
<- end_time - start_time als_sparklyr_predict_run_time
spark_als_predictions
# Source: table<`sparklyr_tmp_a1623aea_c107_4772_860f_3ea9e06e0ef3`> [?? x 6]
# Database: spark_connection
user item rating user_id item_id prediction
<chr> <chr> <dbl> <dbl> <dbl> <dbl>
1 u8086 j5 -0.39 4417 18 -14.2
2 u8086 j7 -7.38 4417 5 -7.81
3 u8086 j8 -4.13 4417 17 -6.15
4 u8086 j11 4.95 4417 39 -3.62
5 u8086 j12 6.31 4417 37 -0.131
6 u8086 j15 -1.99 4417 13 0.356
7 u8086 j16 -3.59 4417 7 3.76
8 u8086 j18 -2.14 4417 0 2.44
9 u8086 j20 0.68 4417 11 -4.32
10 u8086 j21 -6.65 4417 6 -4.77
# ℹ more rows
There are no built in functions for accuracy metrics, so I calculate those manually
<- spark_als_predictions |> mutate(squared_error = (prediction - rating)^2) spark_als_predictions
head(spark_als_predictions)
# Source: SQL [?? x 7]
# Database: spark_connection
user item rating user_id item_id prediction squared_error
<chr> <chr> <dbl> <dbl> <dbl> <dbl> <dbl>
1 u8086 j5 -0.39 4417 18 -14.2 192.
2 u8086 j7 -7.38 4417 5 -7.81 0.185
3 u8086 j8 -4.13 4417 17 -6.15 4.07
4 u8086 j11 4.95 4417 39 -3.62 73.4
5 u8086 j12 6.31 4417 37 -0.131 41.5
6 u8086 j15 -1.99 4417 13 0.356 5.50
<- spark_als_predictions |> select(squared_error) |> summarise(rmse = sqrt(mean(squared_error))) spark_rmse
spark_rmse
Warning: Missing values are always removed in SQL aggregation functions.
Use `na.rm = TRUE` to silence this warning
This warning is displayed once every 8 hours.
# Source: SQL [?? x 1]
# Database: spark_connection
rmse
<dbl>
1 5.34
<- spark_rmse |> collect() local_spark_rmse
spark_disconnect(sc)
Comparison
Below is the RMSE of the recommenderlab model and the spark model on the test sets
1] als_recommenderlab_metrics[
RMSE
5.838981
local_spark_rmse
# A tibble: 1 × 1
rmse
<dbl>
1 5.34
Both models have similar RMSE metrics, which makes sense as they had the same parameters (lambda, n_factors). The RMSE is not exactly the same as the training and test sets were slightly different due to the differences between recommenderlab (train set, known set, unknown set) and Spark (train and test set). Additionally, data in the Spark algorithm was not centered to remove user bias, which can be done automatically in recommenderlab via a hyper-parameter.
From a performance efficiency perspective, the run time for training the models was somewhat similar. In fact, the approach via recommenderlab performed somewhat quicker (difference of about three seconds)
als_recommenderlab_run_time
Time difference of 0.003522158 secs
als_spark_run_time
Time difference of 3.630132 secs
However, implementing via Spark resulted in a notable difference in efficiency (60 seconds faster) when generating predictions:
als_recommenderlab_predict_run_time
Time difference of 1.129575 mins
als_sparklyr_predict_run_time
Time difference of 0.1292229 secs
Generating predictions via ALS requires calculating each users facor vector. Distributing this process across multiple cores leads to a performance improvement versus running this process sequentially.
This begs the question on when it would be more appropriate to use a platform like Spark for implementing a recommendation engine pipeline. Spark is designed for big data workloads4, so the larger a data set is, the more appropriate using Spark becomes. However, there is a cost to using Spark as there is a cost to running servers on platforms such as Databricks, Azure, or AWS. Using Spark would be necessary for models in production that require near real time results and deal with enormous data-sets. For example, running a recommendation engine for Amazon product recommendations, Netflix movie/show recommendations, or Spotify song recommendations would require Spark. In fact, Spotify has discussed how using Spark led to a meaningful improvement in the efficiency of their recommendation models.5 For the purposes of this class, a dataset of one million or more observations could be a good use case for implementing Spark via a free version of a cloud service such as Databricks or Azure. However, this poses an additional challenge Spark’s built in recommendation algorithm is limited to ALS. Implementing collaborative filtering, content based filtering, or SVD algorithms within Spark would require building out those algorithms manually rather than using existing libraries and functions.