1 Introduction

Amazon is an international e-commerce giant with 178bn USD net sale in 2017 and massive portfolio of products offered online, ranging from electronics to fresh produce. Even though being the pioneer in e-retail market has created a treasure trove of reviews for its products, its review system can be abused by sellers or customers writing fake reviews in exchange for incentives.

1.1 Objective:

The purpose of this project is to produce quality recommendations by extracting insights from a large dataset.

1.2 Approach:

- I will be using Amazon Fine Food Reviews dataset that I found in Kaggle.

- I will be using the Recommender Systems that we've learned from the course and apply the methods such as, UBCF, IBCF and SVD. I will compare these models to see which one of them will provide better results.

- I will then try to use Spark to do a distributed processing and then compare the performance and accuracy of the recommendation between the centralized system and the distributed system.

2 Data Presentation

The dataset consists of reviews of fine foods from Amazon. The data span a period of more than 10 years, including all ~500,000 reviews up to October 2012. Reviews include product and user information, ratings (1-5), and a plain text review. It also includes reviews from all other Amazon categories.

For the first part, I'll be using the first 10,000 rows of the dataset.

FieldName Field_Description
Id Unique Identifier
ProductId unique identifier for the product
UserId unqiue identifier for the user
ProfileName
HelpfulnessNumerator number of users who found the review helpful
HelpfulnessDenominator number of users who indicated whether they found the review helpful
Score rating in the range 1 and 5, with 1 being the worse and 5 being the best
Time timestamp
Summary Brief summary of the review
Text Content of the review

2.1 Dataset Information:

Source: SNAP

df_csv <- read.csv("Reviews.csv",nrows = 10000)

kable(head(df_csv,2)) %>% kable_styling("striped", full_width = F)
Id ProductId UserId ProfileName HelpfulnessNumerator HelpfulnessDenominator Score Time Summary Text
1 B001E4KFG0 A3SGXH7AUHU8GW delmartian 1 1 5 1303862400 Good Quality Dog Food I have bought several of the Vitality canned dog food products and have found them all to be of good quality. The product looks more like a stew than a processed meat and it smells better. My Labrador is finicky and she appreciates this product better than most.
2 B00813GRG4 A1D87F6ZCVE5NK dll pa 0 0 1 1346976000 Not as Advertised Product arrived labeled as Jumbo Salted Peanuts...the peanuts were actually small sized unsalted. Not sure if this was an error or if the vendor intended to represent the product as "Jumbo".

2.2 Distribution of Ratings

 

2.3 Data Wrangling

 

# get product counts
Count <- ungroup(df_csv) %>%
          group_by(ProductId) %>%
          summarize(Count=n()) %>%
          arrange(desc(Count))
## `summarise()` ungrouping output (override with `.groups` argument)
df_csv <- merge(df_csv, Count, by.x='ProductId', by.y='ProductId', all.x=T)

# drop unneeded columns
df2 <- df_csv[, c(1:4,7,9:11)]

# get rid of unknown characters
df2$UserId <- gsub('#oc-', '', df2$UserId)

# trim white space
df2[, c(1:6)] <- lapply(df2[, c(1:6)], trimws)

# make Score numeric
df2$Score <- as.numeric(df2$Score)

df3 <- ungroup(df2) %>%
            group_by(ProductId) %>%
            mutate(combine_summary = paste0(Summary, collapse = ' '))

# drop products with fewer than median count
# medianCount <- median(df2$Count)
medianCount <- median(df2$Count)

data_matrix <- ungroup(df3) %>%
            filter(Count >= medianCount)

# remove unneded columns
data_matrix <- data_matrix[, c(3,1,5)]

# remove duplicates
data_matrix <- data_matrix[!duplicated(data_matrix[,c(1,2)]),]

# final dataset for recommender system
kable(head(data_matrix)) %>% kable_styling("striped", full_width = F)
UserId ProductId Score
A22Z9R91N8L7IQ B00016UX0K 5
A3613OHSEBB8RR B00016UX0K 5
A21VGNU5959O85 B00016UX0K 5
A1MLASS8T7VQJG B00016UX0K 5
A1DQ2B9OB6IW18 B00016UX0K 5
A2TQFD9OIALL9M B00016UX0K 5
## Using Score as value column: use value.var to override.

2.4 Define the training and test sets

 

set.seed(100)

eval_sets <- evaluationScheme(data = rlab, method = "split", train = 0.8, given = 1, goodRating = 3)

eval_sets
## Evaluation scheme with 1 items given
## Method: 'split' with 1 run(s).
## Training set proportion: 0.800
## Good ratings: >=3.000000
## Data set: 4402 x 61 rating matrix of class 'realRatingMatrix' with 4913 ratings.
#Evaluation datasets
eval_train = getData(eval_sets, "train")
eval_known = getData(eval_sets, "known")
eval_unknown = getData(eval_sets, "unknown")

3 Build several Recommender Models and run some Predictions

 

# Popular
pop_train <- Recommender(eval_train, method = "POPULAR")
pop_preds <- predict(pop_train, eval_known, type = "ratings")

# SVD
svd_train = Recommender(eval_train, "SVD")
svd_preds = predict(svd_train, eval_known, type = "ratings")

# Random
random_train = Recommender(eval_train, "RANDOM")
random_preds = predict(random_train, eval_known, type = "ratings")


accuracy = rbind(
  SVD = calcPredictionAccuracy(svd_preds, eval_unknown),
  POPULAR = calcPredictionAccuracy(pop_preds, eval_unknown),
  RANDOM = calcPredictionAccuracy(random_preds, eval_unknown)
  )


# Show the accuracy of the 4 Recommender Models
acc_df = round(as.data.frame(accuracy), 3)
kable(acc_df[order(acc_df$RMSE),])%>%
kable_styling("striped", full_width = F)
RMSE MSE MAE
RANDOM 0.787 0.620 0.533
SVD 2.296 5.271 0.761
POPULAR 2.297 5.277 0.769
## POPULAR run fold/sample [model time/prediction time]
##   1  [0.003sec/1.891sec] 
## SVD run fold/sample [model time/prediction time]
##   1  [0.03sec/0.158sec] 
## RANDOM run fold/sample [model time/prediction time]
##   1  [0.001sec/0.158sec]

## 8.363 sec elapsed

4 Distributed Recommender System - Spark Implementation

 

df <- read.csv("Bigdata_Reviews.csv",nrows = 100000)

# retrieve ProductId, UserId, Score
ratings <- df[, c(4,2,9)]

# make UserId and ProductId numeric
ratings$UserId <- as.numeric(ratings$UserId)
ratings$ProductId <- as.numeric(ratings$ProductId)
ratings$Score <- as.numeric(ratings$Score)

tic()

# connect to Spark locally
sc <- spark_connect(master = "local")

# Spark data processing
spark_ratings <- sdf_copy_to(sc, ratings, overwrite = TRUE)

head(spark_ratings)
## # Source: spark<?> [?? x 3]
##   UserId ProductId Score
##    <dbl>     <dbl> <dbl>
## 1 188647     27620     5
## 2  25106     72384     1
## 3 210483     15268     4
## 4 152636     19719     2
## 5  57805     69008     5
## 6 213951     69008     4
# Split for training and testing
partitions <- sdf_random_split(spark_ratings, training = 0.8, testing = 0.2)
#sdf_register(partitions, c("spark_train", "spark_test"))

spark_train <- partitions$training
spark_test <- partitions$test

spark_recommender_model <- ml_als(spark_train, max_iter = 5, nonnegative = TRUE,
                    rating_col = "Score", user_col = "UserId", item_col = "ProductId")

spark_recom <- spark_recommender_model$.jobj %>%
  invoke("transform", spark_dataframe(spark_test)) %>% collect()

spark_recom <- spark_recom[!is.na(spark_recom$prediction), ]
head(spark_recom)
## # A tibble: 6 x 4
##   UserId ProductId Score prediction
##    <dbl>     <dbl> <dbl>      <dbl>
## 1  10169       115     5     1.43  
## 2  74015       115     4     1.75  
## 3  44369       115     1     0.0892
## 4 249538       115     5     1.48  
## 5 220762       282     5    10.0   
## 6 162583      1250     5     4.95
# Evaluating Model
spark_mse <- mean((spark_recom$Score - spark_recom$prediction)^2)
spark_rmse <- sqrt(spark_mse)
spark_mae <- mean(abs(spark_recom$Score - spark_recom$prediction))

spark_eval_accuracy <- data.frame(RMSE = spark_rmse, MSE = spark_mse , MAE = spark_mae)

kable(spark_eval_accuracy) %>% kable_styling("striped", full_width = F)
RMSE MSE MAE
1.682309 2.830163 1.13684
spark_disconnect(sc)

elapsed_time2 <- toc(quiet = FALSE)
## 71.719 sec elapsed

5 Conclusion

In my previous project (Project 5), I was able to justify the speed and accuracy of Spark, a distributed system, versus the Centralized System (local) because I was comparing the same amount of data and a recommender model. In this final project, its not appropriate to compare them since I only used a small amount of data in the regular recommender lab and a big amount of data in Spark.

I guess, the main advantage of Spark or Distributed System is reliability. In distributed system, if one machine crashes,the system as a whole can still survive.

The RMSE of RANDOM prediction model is still better than the RMSE of ALS prediction model in Spark. Maybe, it is due to the fact that the first dataset is smaller and cleaner compared to the dataset that I've used in Spark.