Implementing a Recommender System on Spark

Unlike the previous 4 assignments, this project will leverage the Amazon Video Game Reviews Data that includes 3 related files:

File Name File Size User Qty Item Qty Note
ratings_Video_Games.csv* 53 Mb 826,767 50,2100 All reviews included, no filters.
Video_Games_5.json* 312 Mb 24,303 10,672 Reviews from users who have at least 5 reviews
metadata.json 10.3 Gb N/A 9.4 Million Additional features like description, price per asin

Items marked with a * indicate that the file required decrompression prior to reading in.

The target for this project will be the first file, ratings_Video_Games.csv. I will process it locally using standard R methods from base and tidyverse and compare the performance to local files processed from a Spark node (distributed processing via library sparklyr) and compare their performance where possible. The metadata.json file may be tackled for the Final Project that will leverage this same data.

library(tidyverse) 
library(sparklyr)
library(pryr)

Read-in Performance

In order to use the abovementioned data in a recommender system, I need to first be able to read the data in and run queries. By testing this basic first step in the process of buildinmg a recsys, I will compare the performance between standard and sparklyr processing times.

my_files_path <- "C:\\Users\\Jaan\\Documents\\MSDA\\data643\\DATA643_FP\\"

t_spark_conn = Sys.time()
config = spark_config()
config$`sparklyr.shell.driver-java-options` <- paste0("-Djava.io.tmpdir=", 
                                                      spark_home_dir())
config$`sparklyr.shell.driver-memory` <- "4G"
config$`sparklyr.shell.executor-memory` <- "4G"
config$`spark.yarn.executor.memoryOverhead` <- "512"

sc <- spark_connect(master = "local", config = config, version = '2.3.0')
t_spark_conn = Sys.time() - t_spark_conn

The creation of sc via the spark_connect method took 38.177629. I suspect this time could increase if the connection to the spark cluster were not local.

Next, I time the initial read-in durations for ratings_Video_Games.csv using standard tidyverse methods and then using sparklyr:

t_readcsv <- Sys.time()
# initial read-in using standard R
ratb <- read.csv(paste0(my_files_path,"ratings_Video_Games.csv"), header = F,
                 col.names = c('reviewerID', 'asin', 'overall', 'unixReviewTime'))

t_readcsv = Sys.time() - t_readcsv

# initial read-in using sparklyr
t_readcsv_sp <- Sys.time()
rats <- spark_read_csv(sc, name = "ratings", 
                       header = FALSE, 
                       columns = c('reviewerID', 
                                   'asin', 'overall', 
                                   'unixReviewTime'),
                       path = paste0(my_files_path,"ratings_Video_Games.csv"), 
                       memory = F)
t_readcsv_sp = Sys.time() - t_readcsv_sp

# create df to hold durations of tasks
df_compare <- data.frame(Method = c("standard", "sparklyr"), 
                         Task = c("initial read", "initial read"), 
                         Duration = c(t_readcsv, t_readcsv_sp), 
                         `Bytes Used` = c(object_size(ratb), 
                                          object_size(rats)), 
                         stringsAsFactors = F)

After storing the results of that comparison in df_compare, I move on to the next part.

Query Performance

Another basic step in building a recsys is being able to review the data under consideration. In this section, I’ll compare standard and sparklyr with respect to the basic queries of:

  • obtain a table users and their respective mean ratings
  • obtain a table of games and their respective mean rating

The first set of queries that I’ll perform is mean ratings per user followed by the mean ratings per game. The duration for the results will be timed for standard and sparklyr respectively.

# standard query for mean ratings among users, games
t_getmeans = Sys.time()
mean_urat <- ratb %>%
  group_by(reviewerID) %>%
  summarise(avg_urat = mean(overall, na.rm = T))

mean_irat <- ratb %>%
  group_by(asin) %>%
  summarise(avg_irat = mean(overall, na.rm = T))

t_getmeans = Sys.time() - t_getmeans

# spark query - mean ratings among users, games, 
# collect returns the results. 
t_getmeans_sp = Sys.time()
mean_irat_sp <- rats %>%
  group_by(asin) %>%
  summarise(avg_irat = mean(overall, na.rm = T)) %>% collect()

mean_urat_sp <- rats %>%
  group_by(reviewerID) %>%
  summarise(avg_urat5 = mean(overall, na.rm = T)) %>% collect()

t_getmeans_sp = Sys.time() - t_getmeans_sp

Results comparison

Now I combine the results from the “read-in” and “query: mean ratings” sections and review:

df_compare2 <- data.frame(Method = c("standard", "sparklyr"), 
                         Task = c("query: mean ratings", "query: mean ratings"), 
                         Duration = c(t_getmeans, t_getmeans_sp), 
                         `Bytes Used` = c(
                           object_size(mean_urat) + object_size(mean_irat),
                           object_size(mean_urat_sp) + object_size(mean_irat_sp)), 
                         stringsAsFactors = F)

results <- rbind(df_compare, df_compare2)

# free-up memory... 
rm(mean_irat, mean_urat, mean_irat_sp, mean_urat_sp, 
   df_compare, df_compare2)
Method Task Duration Bytes.Used
standard initial read 28.96790 secs 89657224
sparklyr initial read 34.07009 secs 10432
standard query: mean ratings 64.67783 secs 73686880
sparklyr query: mean ratings 40.40431 secs 70178312

The information in the above table provides an excellent measure of the performance that can be gained by using even a local cluster with sparklyr.

For example, for query: mean ratings sparklyr’s calculation time was one fourth of that in local memory! Additionally, the Bytes.Used is smaller for sparklyr for both tasks. Further, I’ve included the below plots for comparing the duration between the methods.

p1 <- results %>% filter(Task == "initial read") %>% 
  ggplot(aes(x = factor(Method), y = Duration, fill = factor(Method))) + 
  geom_col() + xlab(NULL) + labs(title = "Duration of Initial Read", 
                    subtitle = "Spark connection duration not included.") + 
  theme(legend.position="none")

p2 <- results %>% filter(Task == "query: mean ratings") %>% 
  ggplot(aes(x = factor(Method), y = Duration, fill = factor(Method))) + 
  geom_col() + 
  xlab(NULL) + labs(title = "Duration of Mean-Ratings", 
                    subtitle = "(Users and Games)") + 
  theme(legend.position="none")

par(mfrow=c(1,2))
gridExtra::grid.arrange(p1, p2, ncol=2)

As shown in the above plots, the performance in speed for a local cluster is dramatic among these tasks - sparklyr using Spark is far superior.

Spark’s Lazy Evaluation

The key to Spark’s performance appears to be in the lazy evaluation. Otherwise known as “call-by-need” (a better descriptor, IMO), queries are called specifically as needed. In particular, sparklyr didn’t start calcuating anything until the collect command was added. The collect command brings the results for local calcuation.

Considerations

Because the connection to spark can take a while to process using sparklyr it’s not practical for any dataset that can comfortably run in R natively - less than 100Mb woudln’t be worth it. For my computer, it’s about 100Mb before the system starts acting up.

Beyond that size, sparklyr provides a way to crunch data that you’d never be able to do in local memory.

Below, I get the mean rating per game from the Video_Games_5.json - something that my computer was unable to load and crashed in the attempt.

t_300mb = Sys.time()
rvw5 <- spark_read_json(sc, name = "five_reviews",
                        header = TRUE, 
                        path = paste0(my_files_path,"Video_Games_5.json"), 
                        memory = F)

mean_irat5 <- rvw5 %>%
  group_by(asin) %>%
  summarise(avg_irat5 = mean(overall, na.rm = T))

t_300mb = Sys.time() - t_300mb


meta <- spark_read_json(sc, name = "meta",
                        header = TRUE,
                        path = paste0(my_files_path,"metadata.json"),
                        memory = F) %>% select(asin, brand, title)

toc1 = Sys.time()
head(meta)
## # Source:   lazy query [?? x 3]
## # Database: spark_connection
##   asin       brand      title                                             
##   <chr>      <chr>      <chr>                                             
## 1 0001048791 <NA>       The Crucible: Performed by Stuart Pankin, Jerome ~
## 2 0000143561 <NA>       Everyday Italian (with Giada de Laurentiis), Volu~
## 3 0000037214 Big Dreams Purple Sequin Tiny Dancer Tutu Ballet Dance Fairy~
## 4 0000032069 BubuBibi   Adult Ballet Tutu Cheetah Pink                    
## 5 0000031909 Unknown    Girls Ballet Tutu Neon Pink                       
## 6 0000032034 BubuBibi   Adult Ballet Tutu Yellow
toc1 = Sys.time() - toc1

To calculate the mean rating per game from a 300+ Mb file it took 7.8144469 - a “great” time given that I couldn’t do it using my local memory.

Further, to obtain the head(meta) (that 10Gb file) it took 5.8953371. That’s very reasonable given that I wouldn’t be able to load that file in order to check the head in standard r.

References and Sources: