The goal of this project is give you practice beginning to work with a distributed recommender system. It is sufficient for this assignment to build out your application on a single node.

  • Adapt one of your recommendation systems to work with Apache Spark and compare the performance with your previous iteration. Consider the efficiency of the system and the added complexity of using Spark.

  • You may complete the assignment using PySpark (Python), SparkR (R) , sparklyr (R), or Scala.

  • Please include in your conclusion: For your given recommender system’s data, algorithm(s), and (envisioned) implementation, at what point would you see moving to a distributed platform such as Spark becoming necessary?

set.seed(222)
# Libraries
library(recommenderlab)
library(sparklyr)
library(dplyr)
#library(tidyverse)

1 Data Loading and Spark

Using local spark installation as a single node and Connecting to Spark

# Set up the Spark Home directory
Sys.setenv(SPARK_HOME="C:\\Users\\kumarn8\\AppData\\Local\\spark\\spark-2.2.0-bin-hadoop2.7")
sc <- spark_connect(master = "local",  config = spark_config())
data(Jester5k)
#constrain / normalize
Jester5k@data@x[Jester5k@data@x[] < -10] <- -10
Jester5k@data@x[Jester5k@data@x[] > 10] <- 10

# Creatins subset of jokes with more than 80 ratings and users with more than twenty rating
Jester5k_r <- Jester5k[rowCounts(Jester5k) > 80,  colCounts(Jester5k) > 20]

start_time<- Sys.time()
e <- evaluationScheme(Jester5k_r, method = "split",train = 0.8, given = 30, goodRating = 3, k=5)
e
## Evaluation scheme with 30 items given
## Method: 'split' with 5 run(s).
## Training set proportion: 0.800
## Good ratings: >=3.000000
## Data set: 1701 x 100 rating matrix of class 'realRatingMatrix' with 167062 ratings.

2 Building Model

# Creation of the model - UBCF Model technique
Rec.model <- Recommender(getData(e, "train"), "UBCF", parameter = list(method = "pearson", normalize = "Z-score", nn=25))

#Making predictions 
prediction_UJ <- predict(Rec.model, getData(e, "known"), type="ratings",n=10)

# set all predictions that fall outside the valid range to the boundary values
prediction_UJ@data@x[prediction_UJ@data@x[] < -10] <- -10
prediction_UJ@data@x[prediction_UJ@data@x[] > 10] <- 10

timeA <- Sys.time() - start_time
#loading data
jDF <- as(Jester5k, 'data.frame')
jDF$user <- as.numeric(jDF$user)
jDF$item <- as.numeric(jDF$item)

#connecting to local spark
jDF <- sdf_copy_to(sc,jDF, "spark_jester", overwrite=T)
jDF
## # Source: table<spark_jester> [?? x 3]
## # Database: spark_connection
##     user  item rating
##    <dbl> <dbl>  <dbl>
##  1  3385  1.00   7.91
##  2  3385 13.0    9.17
##  3  3385 24.0    5.34
##  4  3385 35.0    8.16
##  5  3385 46.0   -8.74
##  6  3385 57.0    7.14
##  7  3385 68.0    8.88
##  8  3385 79.0   -8.25
##  9  3385 90.0    5.87
## 10  3385  2.00   6.21
## # ... with more rows

3 ML ALS Factorization Technique

ALS attempts to estimate the ratings matrix R as the product of two lower-rank matrices, X and Y, i.e. X * Yt = R. Typically these approximations are called ‘factor’ matrices. The general approach is iterative. During each iteration, one of the factor matrices is held constant, while the other is solved for using least squares. The newly-solved factor matrix is then held constant while solving for the other factor matrix.

This is a blocked implementation of the ALS factorization algorithm that groups the two sets of factors (referred to as “users” and “products”) into blocks and reduces communication by only sending one copy of each user vector to each product block on each iteration, and only for the product blocks that need that user’s feature vector

implicit_model <- ml_als_factorization(jDF, 
                                       iter.max = 2, 
                                       regularization.parameter = 0.01, 
                                        regularization.parameter = 0.01,
                                      iter.max = 10L
                                     )

summary(implicit_model)
##                         Length Class      Mode       
## uid                     1      -none-     character  
## param_map               4      -none-     list       
## rank                    1      -none-     numeric    
## recommend_for_all_items 1      -none-     function   
## recommend_for_all_users 1      -none-     function   
## item_factors            2      tbl_spark  list       
## user_factors            2      tbl_spark  list       
## user_col                1      -none-     character  
## item_col                1      -none-     character  
## prediction_col          1      -none-     character  
## .jobj                   2      spark_jobj environment

In this work we identify unique properties of implicit feedback datasets. We propose treating the data as indication of positive and negative preference associated with vastly varying confidence levels. This leads to a factor model which is especially tailored for implicit feedback recommenders

model_als <- ml_als_factorization(jDF, rating.column = "rating", user.column = "user", 
    item.column = "item", iter.max = 7)
summary(model_als)
##                         Length Class      Mode       
## uid                     1      -none-     character  
## param_map               4      -none-     list       
## rank                    1      -none-     numeric    
## recommend_for_all_items 1      -none-     function   
## recommend_for_all_users 1      -none-     function   
## item_factors            2      tbl_spark  list       
## user_factors            2      tbl_spark  list       
## user_col                1      -none-     character  
## item_col                1      -none-     character  
## prediction_col          1      -none-     character  
## .jobj                   2      spark_jobj environment
# Calculate Predicton
model_als$user_factors
## # Source: table<sparklyr_tmp_5544521f24a8> [?? x 12]
## # Database: spark_connection
##       id feat~ featur~ featur~ featu~ featu~ featu~ featur~ featu~ featur~
##    <int> <lis>   <dbl>   <dbl>  <dbl>  <dbl>  <dbl>   <dbl>  <dbl>   <dbl>
##  1    10 <lis~  0.140  -0.213  -0.896 -1.46  -1.90  -0.271   1.14   1.68  
##  2    20 <lis~ -0.404   0.332  -1.15  -0.886 -0.712  0.0406  1.64   1.32  
##  3    30 <lis~ -0.384   1.33   -0.300  1.42  -1.38   1.08   -0.514 -0.166 
##  4    40 <lis~  0.0987  0.656  -0.532  0.243 -0.778  0.316   0.392  0.537 
##  5    50 <lis~  0.568   1.30   -1.70  -0.638 -2.33   0.681   0.550  0.149 
##  6    60 <lis~ -0.0425  1.27    0.476 -2.79  -1.40  -2.33    0.932  0.0739
##  7    70 <lis~ -1.23   -0.981  -1.18  -1.92  -1.12   0.681   1.87   1.56  
##  8    80 <lis~ -0.280  -0.0102 -1.26   1.08   1.02  -1.25   -1.56   0.315 
##  9    90 <lis~ -0.262   2.02   -0.713 -1.54  -1.67  -0.410  -0.267  0.0981
## 10   100 <lis~ -0.0133  1.26   -0.304 -1.46  -0.786 -0.826   2.25   0.103 
## # ... with more rows, and 2 more variables: features_9 <dbl>, features_10
## #   <dbl>
reco <- ml_recommend(model_als)
head(reco)
## # Source: lazy query [?? x 4]
## # Database: spark_connection
##    user recommendations  item rating
##   <int> <list>          <int>  <dbl>
## 1    12 <list [2]>         89  10.8 
## 2    13 <list [2]>         10 - 5.74
## 3    14 <list [2]>         23   3.13
## 4    18 <list [2]>         24   7.21
## 5    25 <list [2]>         89   7.81
## 6    37 <list [2]>         21   6.71

3.1 ml_predict

using ml_predict method to predic the ratings

ml_predict(model_als,spark_dataframe(jDF))
## # Source: table<sparklyr_tmp_554457df56c1> [?? x 4]
## # Database: spark_connection
##     user  item  rating prediction
##    <dbl> <dbl>   <dbl>      <dbl>
##  1  12.0  12.0  9.08        6.28 
##  2  13.0  12.0 -9.61       -8.65 
##  3  14.0  12.0 -9.51       -7.09 
##  4  18.0  12.0 -9.13       -9.01 
##  5  25.0  12.0 -1.94        0.373
##  6  37.0  12.0 -2.67        2.44 
##  7  38.0  12.0 -5.10       -5.57 
##  8  46.0  12.0 -3.83       -4.47 
##  9  50.0  12.0 -2.77       -1.44 
## 10  52.0  12.0 -0.0500     -0.762
## # ... with more rows
spark_disconnect(sc)

4 Conclusion

Spark is an efficient data processing framework and very popular for processing and analyzing Big Data sets. As we can see in this example, spark is extremly powerful when it comes to handling sparse data matrix and even for Big Data pre procissing. Because of its distributed and parallel computing power it can help to anlayze data faster and also makes process of building data pipeline easier.