The original plan for this final project was to build a recommender system using AWS platform and experiment with hybrid recommender systems as well as various data features. Unfortunately, our team had little to no experience with AWS, Hadoop, or distributed systems. We quickly discovered that trying to run the Spark code in a local environment on 1MM+ ratings is problematic and we quickly ran out of resources. Since our smaller projects covered a lot of aspects of recommender system theory, we have decided for this project to concentrate on building the distributed environment. Configuring the environment proved to be very challenging and frustrating.
It was difficult to split up the project tasks into distinct parts so we opted to independently attempted various approaches simultaneously - standalone cluster, EMR cluster, etc. - sharing knowledge, configurations, and problems as we encountered them.
Although we have only scratched the surface of big data processing and distributed computing, this project has provided tremendous experience through the challenges we faced in deploying a robust recommender system on AWS. Further, and perhaps more applicable to our Data Science careers as a whole, this project has exposed us to how common data organization and processing can be achieved on a large scale.
We are using the data set provided by Julian McAuley at UC San Diego (http://jmcauley.ucsd.edu/data/amazon/). Since the goal of this project is implementation, we went with an easily accessible data set. It includes product ratings for video games at Amazon. Data covers reviews from May 1996 to July 2014.
The data set includes over 800,000 users and 50,000 products with over 1,300,000 ratings. In addition to simple ratings, the data set includes actual reviews as well as metadata about the items, such as description and price. Below, a single review record with associated metadata
{"reviewerID":"A361M14PU2GUEG","asin":"0700099867","reviewerName":"Angry Ryan \"Ryan A. Forrest\"","helpful":[2,2],"reviewText":"I had Dirt 2 on Xbox 360 and it was an okay game. I started playing games on my laptop and bought a few new games to build my collection. This game is very fun to play. It is so much better than Dirt 2. If you like racing games you should check this out. The graphics are perfect on my compter.","overall":4,"summary":"DIRT 3","unixReviewTime":1308009600,"reviewTime":"06 14, 2011"}
Prior to starting this project, our team had very limited AWS experience and our collective work with Hadoop, Spark, and S3 was almost nil. Despite these challenges, we decided AWS promised to provide the most customizable approach for distributed computing framework. And coupled with RStudio’s interface to Apache Spark: sparklyr
, we were provided a relatively familiar and simple framework for handling large data sets.
All data files are stored using AWS S3 platform. One of the challenges we encountered was reading data from S3 directly into Hadoop/Spark. The problem stemmed from configuration set up as sparklyr
refused to locate S3 storage. One of the workarounds we attempted was using the aws.s3
R library to read the data from S3 into a local data frame. However, then copying the data to Spark was too slow to be usable. The copy_to
process in sparklyr
is not designed to handle large data frames. Eventually, this issue was resolved and we were able to read directly into Spark.
Lack of clear examples or instructions in setting up Spark in a distributed environment (preferably on AWS) proved to be one of the biggest challenges. The steps needed for standing up a distributed computing environment haven’t completely hit the mainstream so documentation we encountered was out of date or referencing technologies that were falling out of favor in the industry. Even sparklyr
documentation is still referencing version 1.6.2 while the product is well into version 2.0. Our solution was to build upon ideas based on a mix of various posts, examples, and articles. AWS documentation was dense, but generally helpful and up-to-date.
Our individual research into different configurations yielded a number of false starts. For example, We attempted to set up a standalone cluster using 4 AWS EC2 instances - one master node and 3 workers. Although this setup appeared to be possibly the easiest one, proper deployment of all components proved difficult and missing or out of date components were included.
AWS provides a lot of flexibility, but it also has a lot of moving parts that need to be accounted for. This is challenging to any individual without prior experience with AWS or with deployment of distributed systems. At one point, Ilya’s AWS account was disabled because access information was shared in a public GitHub repository and at another, Jaan’s AWS billing account was forecasting $400/month charge for computing services (ended up being less than $20).
An easier solution, but not without its challenges, proved to be AWS’ EMR product. It can be used to deploy variety of platforms and provides easier management and scaling of deployed cluster. We utilized Hadoop 2.7.3 and Spark 2.1.0 in an EMR cluster containing 1 master node and 2 core nodes. Nodes are based on m3.xlarge instance with 8 cores, 15 GB of memory and 80 GB of SSD space. See Appendix A for setup steps.
The code below uses Spark to build a simple ALS collaborative filtering model and evaluate it. We ran through the code multiple times with various settings. Below is timing in seconds of several steps in the workflow. The table includes results for a cluster with 2 worker nodes, a cluster with 3 worker nodes and a cluster with 3 worker nodes and adjusted settings.
Process | 2 Nodes | 3 Nodes | 3 Custom Nodes |
---|---|---|---|
Establishing Spark Connection | 31.12 | 32.57 | 35.67 |
Importing Data from S3 to Spark | 40.74 | 44.83 | 33.37 |
Conversion of IDs from String to Integer | 18.21 | 18.08 | 14.01 |
Building Model | 262.85 | 261.77 | 187.51 |
Predicting Ratings | 0.14 | 0.20 | 0.18 |
Transferring Prediction to Data Frame | 391.22 | 262.45 | 447.11 |
Scaling from 2-node to 3-node environment was very simple with AWS EMR, but it did not significantly impact the results. It was clear through our experience that in order to provide efficiency Spark environment must be optimized with explicit settings. We have tried adjusting executor settings - memory used, number of cores and instances. With some changes we saw significant improvement in building a model.
The RMSE value for all models was around 1.89. It is far from ideal, but as mentioned above our concentration was in platform implementation rather than strict recommender model optimization.
Unexpectedly setting up a distributed Spark environment proved to be a bear of a project. We set up a basic environment able to approach the data, but there are a number of possible improvements to be made - here are two main areas we have identified.
emr-5.5.2
)# Install libraries if needed
install.packages("tictoc")
install.packages("aws.s3")
install.packages("sparklyr")
# Set up data frame to track processing times
library(tictoc)
timing <- data.frame(Process=character(), Time=double())
library(sparklyr)
library(dplyr)
Sys.setenv(SPARK_HOME="/usr/lib/spark")
# Open Spark connection
tic()
step <- "Establish Spark Connection"
config <- spark_config()
#config$spark.executor.memory <- "8G"
#config$spark.executor.cores <- 2
#config$spark.executor.instances <- 3
#config$spark.dynamicAllocation.enabled <- "false"
sc <- spark_connect(master = "yarn-client", config = config, version = '2.1.0')
t <- toc(quiet = TRUE)
timing <- rbind(timing, data.frame(Process = step, Time = round(t$toc - t$tic, 2)))
# Set up S3 and read data to Spark
ctx <- spark_context(sc)
jsc <- invoke_static(sc, "org.apache.spark.api.java.JavaSparkContext", "fromSparkContext", ctx)
hconf <- jsc %>% invoke("hadoopConfiguration")
access_key <- "[Must be modified to include corresponding access key]"
secret_key <- "[Must be modified to include corresponding secret key]"
hconf %>% invoke("set","fs.s3a.access.key", access_key)
hconf %>% invoke("set","fs.s3a.secret.key", secret_key)
tic()
step <- "Read Data from S3 to Spark"
games <- spark_read_csv(sc, name = "games_df",
path = "s3a://data643summer18/ratings_Video_Games.csv",
overwrite = TRUE, header = FALSE)
t <- toc(quiet = TRUE)
timing <- rbind(timing, data.frame(Process = step, Time = round(t$toc - t$tic, 2)))
# Data exploration
head(games)
games <- games %>%
rename(UserID = V1) %>%
rename(ProductID = V2) %>%
rename(Rating = V3) %>%
rename(Timestamp = V4)
# Convert IDs to numeric for use in ALS
tic()
step <- "Data Conversion: String to Integer"
games <- games %>%
ft_string_indexer(input_col = "UserID", output_col = "UserIDn") %>%
ft_string_indexer(input_col = "ProductID", output_col = "ProductIDn")
t <- toc(quiet = TRUE)
timing <- rbind(timing, data.frame(Process = step, Time = round(t$toc - t$tic, 2)))
# Split for trainig and testing
games_partition <- sdf_partition(games, training = 0.8, testing = 0.2)
sdf_register(games_partition, c("games_train", "games_test"))
games_tidy_train <- tbl(sc, "games_train") %>%
select(UserIDn, ProductIDn, Rating)
# Build model
tic()
step <- "Building ALS Model"
sparkALS <- ml_als(games_tidy_train, max_iter = 5, nonnegative = TRUE,
rating_col = "Rating", user_col = "UserIDn", item_col = "ProductIDn")
t <- toc(quiet = TRUE)
timing <- rbind(timing, data.frame(Process = step, Time = round(t$toc - t$tic, 2)))
# Run prediction
games_test <- tbl(sc, "games_test")
tic()
step <- "Predict Ratings Using ALS Model"
games_pred <- ml_predict(sparkALS, games_test)
t <- toc(quiet = TRUE)
timing <- rbind(timing, data.frame(Process = step, Time = round(t$toc - t$tic, 2)))
tic()
step <- "Read Results into Data Frame"
prediction <- collect(games_pred)
t <- toc(quiet = TRUE)
timing <- rbind(timing, data.frame(Process = step, Time = round(t$toc - t$tic, 2)))
head(prediction)
# Calculate error
sparkPred <- prediction[!is.na(prediction$prediction), ] # Remove NaN due to data set splitting
rmseSpark <- sqrt(mean((sparkPred$Rating - sparkPred$prediction)^2))
rmseSpark
# Clean up
spark_disconnect(sc)