1 Objective

The goal of this project is to practice working with a distributed recommender system.

Issues to consider during the project:

  • Adapt one of your recommendation systems to work with Apache Spark and compare the performance with your previous iteration. Consider the efficiency of the system and the added complexity of using Spark.
  • For your given recommender system’s data, algorithm(s), and (envisioned) implementation, at what point would you see moving to a distributed platform such as Spark becoming necessary?

2 Data source and approach

For this project, we will work with the MovieLense 100k dataset of movie ratings, which is a dataset that I used in Project 4. The dataset includes about 100,000 ratings of 1,664 movies from 943 users. Ratings are given as real values ranging from 1 (worse) to 5 (best).

In Project 4 we used the recommenderlab package to develop and test our recommender system using different recommendation algorithms. In this project, we will develop a recommender system using the alternating least squares (ALS) matrix factorization method from the MLLib package in Apache Spark. We will access Apache Spark using the sparklyr package in local mode.

library(tidyverse)
library(recommenderlab)
library(knitr)
library(sparklyr)
library(dplyr)

3 Data preparation

We start by loading the data and preparing it for the recommender system. We see that the full ratings matrix includes over 99 thousand ratings, out of a total of 1.57 million user-movie pairs. This implies that the ratings matrix is 94% sparse.

# load data
data("MovieLense")
m <- MovieLense

# examine the ratings matrix
head(colnames(m), 10)
##  [1] "Toy Story (1995)"                                    
##  [2] "GoldenEye (1995)"                                    
##  [3] "Four Rooms (1995)"                                   
##  [4] "Get Shorty (1995)"                                   
##  [5] "Copycat (1995)"                                      
##  [6] "Shanghai Triad (Yao a yao yao dao waipo qiao) (1995)"
##  [7] "Twelve Monkeys (1995)"                               
##  [8] "Babe (1995)"                                         
##  [9] "Dead Man Walking (1995)"                             
## [10] "Richard III (1995)"
getRatingMatrix(m)[1:10, 1:30] 
## 10 x 30 sparse Matrix of class "dgCMatrix"
##                                                               
## 1  5 3 4 3 3 5 4 1 5 3 2 5 5 5 5 5 3 4 5 4 1 4 4 3 4 3 2 4 1 3
## 2  4 . . . . . . . . 2 . . 4 4 . . . . 3 . . . . . 4 . . . . .
## 3  . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
## 4  . . . . . . . . . . 4 . . . . . . . . . . . . . . . . . . .
## 5  4 3 . . . . . . . . . . . . . . 4 . . . 3 . . 4 3 . . . 4 .
## 6  4 . . . . . 2 4 4 . . 4 2 5 3 . . . 4 . 3 3 4 . . . . 2 . .
## 7  . . . 5 . . 5 5 5 4 3 5 . . . . . . . . . 5 3 . 3 . 4 5 3 .
## 8  . . . . . . 3 . . . 3 . . . . . . . . . . 5 . . . . . . . .
## 9  . . . . . 5 4 . . . . . . . . . . . . . . . . . . . . . . .
## 10 4 . . 4 . . 4 . 4 . 4 5 3 . . 4 . . . . . 5 5 . . . . . . .

In order to use the ALS matrix factorization function in the Apache Spark MLlib package, we need to recast the ratings matrix into a dataframe with numeric (integer) values for user and item IDs. In the process we also save the actual movie names in case we need them later.

# recast ratings matrix into dataframe with numeric user & item IDs
movie_names <- colnames(m)
colnames(m) <- 1:ncol(m)
df <- as(m, "data.frame")
df$user <- as.integer(df$user)
df$item <- as.integer(df$item)
df$rating <- as.integer(df$rating)
str(df)
## 'data.frame':    99392 obs. of  3 variables:
##  $ user  : int  1 1 1 1 1 1 1 1 1 1 ...
##  $ item  : int  1 777 888 999 1110 1221 1332 1443 1554 2 ...
##  $ rating: int  5 3 4 3 3 5 4 1 5 3 ...
# view the dataframe
head(df)
##     user item rating
## 1      1    1      5
## 453    1  777      3
## 584    1  888      4
## 674    1  999      3
## 883    1 1110      3
## 969    1 1221      5
tail(df)
##       user item rating
## 93643  938   45      2
## 94227  938   67      2
## 94450  938   75      4
## 96451  938  201      3
## 97147  938  246      3
## 98112  938  359      3

Next we establish a connection to Apache Spark in local mode, and copy the dataframe to a Spark table.

sc <- spark_connect(master = "local")
ratings_tbl <- copy_to(sc, df, "ratings", overwrite = TRUE)
src_tbls(sc)
## [1] "ratings"
class(ratings_tbl)
## [1] "tbl_spark" "tbl_sql"   "tbl_lazy"  "tbl"
glimpse(ratings_tbl)
## Observations: ??
## Variables: 3
## Database: spark_connection
## $ user   <int> 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1...
## $ item   <int> 1, 777, 888, 999, 1110, 1221, 1332, 1443, 1554, 2, 113,...
## $ rating <int> 5, 3, 4, 3, 3, 5, 4, 1, 5, 3, 2, 5, 5, 5, 5, 5, 3, 4, 5...

Now we review the ratings and plot the distribution of rating counts and average ratings, per user and per movie. Unfortunately there seem to be some stability issues with the code below, as it sometimes fails at the collect function calls. This problem retrieving query results back from Apache Spark may be attributable to integration issues between RStudio and Spark. In an R script, the code below works fine and seems to produce the histograms without fail.

# distribution of counts & avg rating per user
by_user <- ratings_tbl %>% 
    group_by(user) %>% 
    summarise(count = n(), avg = mean(rating, na.rm = TRUE)) %>% 
    collect()
head(by_user)
## # A tibble: 6 x 3
##    user count   avg
##   <int> <dbl> <dbl>
## 1   556   208  3.64
## 2    13   180  3.46
## 3    46    98  4.09
## 4   101    19  3.63
## 5   135   127  3.37
## 6   157    68  4.32
ggplot(by_user, aes(count)) + geom_histogram() + 
    labs(title = "Distribution of Rating Count per User", 
         x = "Rating Count")

ggplot(by_user, aes(avg)) + geom_histogram() + 
    labs(title = "Distribution of Average Rating per User", 
         x = "Average Rating")

summary(by_user)    
##       user           count            avg       
##  Min.   :  1.0   Min.   : 19.0   Min.   :1.497  
##  1st Qu.:236.5   1st Qu.: 32.0   1st Qu.:3.325  
##  Median :472.0   Median : 64.0   Median :3.619  
##  Mean   :472.0   Mean   :105.4   Mean   :3.588  
##  3rd Qu.:707.5   3rd Qu.:147.5   3rd Qu.:3.871  
##  Max.   :943.0   Max.   :735.0   Max.   :4.870
# distribution of counts & avg rating per movie
by_item <- ratings_tbl %>% 
    group_by(item) %>% 
    summarise(count = n(), avg = mean(rating, na.rm = TRUE)) %>% 
    collect() 
head(by_item)
## # A tibble: 6 x 3
##    item count   avg
##   <int> <dbl> <dbl>
## 1   777   131  3.21
## 2   999   209  3.55
## 3   446   183  3.97
## 4   668    39  3.21
## 5   766    69  3.96
## 6   822   174  3.45
par(mfrow = c(1,2))
ggplot(by_item, aes(count)) + geom_histogram() + 
    labs(title = "Distribution of Rating Count per Movie", 
         x = "Rating Count")

ggplot(by_item, aes(avg)) + geom_histogram() + 
    labs(title = "Distribution of Avg. Rating per Movie",
         x = "Average Rating")

summary(by_item)    
##       item            count             avg       
##  Min.   :   1.0   Min.   :  1.00   Min.   :1.000  
##  1st Qu.: 416.8   1st Qu.:  7.00   1st Qu.:2.665  
##  Median : 832.5   Median : 27.00   Median :3.162  
##  Mean   : 832.5   Mean   : 59.73   Mean   :3.077  
##  3rd Qu.:1248.2   3rd Qu.: 80.00   3rd Qu.:3.653  
##  Max.   :1664.0   Max.   :583.00   Max.   :5.000

4 Recommender model

We will develop a recommender model based on the alternating least squares algorithm (ALS) using the ml_als function in sparklyr. We follow these steps:

  • Partition the ratings data into training and test sets in an 80/20 split
  • Train the model using the training set partitions$training
  • Use the model to predict ratings for the test set partitions$test
  • Compute the model accuracy in terms of the root mean squared error (RMSE) metric.

However, we can see below that the ml_als function call to Apache Spark runs into problems, and Spark ends up terminating the process. I tried using the ml_als_factorization function referenced in this week’s class materials, but that function was deprecated in an earlier version of sparklyr. In addition, I logged a new issue on the sparklyr issues page. The developer response was to try installing a different version of the local Spark application; I tried using Spark versions 2.4.3 (the current version), 2.3, 2.2, and 2.1, but unfortunately the ml_als function call didn’t work with any of these versions.

# partition the ratings
partitions <- ratings_tbl %>% 
    sdf_random_split(training = 0.8, test = 0.2, seed = 314159)

# fit the model on the training set
als_model <- partitions$training %>% 
    ml_als(rating ~ user + item)
## Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 35.0 failed 1 times, most recent failure: Lost task 0.0 in stage 35.0 (TID 181, localhost, executor driver): java.lang.StackOverflowError
##  at java.io.ObjectInputStream$BlockDataInputStream.readByte(Unknown Source)
##  at java.io.ObjectInputStream.readHandle(Unknown Source)
##  at java.io.ObjectInputStream.readClassDesc(Unknown Source)
##  at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
##  at java.io.ObjectInputStream.readObject0(Unknown Source)
##  at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
##  at java.io.ObjectInputStream.readSerialData(Unknown Source)
##  at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
##  at java.io.ObjectInputStream.readObject0(Unknown Source)
##  at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
##  at java.io.ObjectInputStream.readSerialData(Unknown Source)
##  at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
##  at java.io.ObjectInputStream.readObject0(Unknown Source)
##  at java.io.ObjectInputStream.readObject(Unknown Source)
##  at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
##  at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
##  at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
##  at java.lang.reflect.Method.invoke(Unknown Source)
##  at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
##  at java.io.ObjectInputStream.readSerialData(Unknown Source)
##  at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
##  at java.io.ObjectInputStream.readObject0(Unknown Source)
##  at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
##  at java.io.ObjectInputStream.readSerialData(Unknown Source)
##  at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
##  at java.io.ObjectInputStream.readObject0(Unknown Source)
##  at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
##  at java.io.ObjectInputStream.readSerialData(Unknown Source)
##  at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
##  at java.io.ObjectInputStream.readObject0(Unknown Source)
##  at java.io.ObjectInputStream.readObject(Unknown Source)
##  at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
##  at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
##  at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
##  at java.lang.reflect.Method.invoke(Unknown Source)
##  at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
##  at java.io.ObjectInputStream.readSerialData(Unknown Source)
##  at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
##  at java.io.ObjectInputStream.readObject0(Unknown Source)
##  at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
##  at java.io.ObjectInputStream.readSerialData(Unknown Source)
##  at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
##  at java.io.ObjectInputStream.readObject0(Unknown Source)
##  at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
##  at java.io.ObjectInputStream.readSerialData(Unknown Source)
##  at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
##  at java.io.ObjectInputStream.readObject0(Unknown Source)
##  at java.io.ObjectInputStream.readObject(Unknown Source)
##  at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
##  at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
##  at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
##  at java.lang.reflect.Method.invoke(Unknown Source)
##  at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
##  at java.io.ObjectInputStream.readSerialData(Unknown Source)
##  at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
##  at java.io.ObjectInputStream.readObject0(Unknown Source)
##  at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
##  at java.io.ObjectInputStream.readSerialData(Unknown Source)
##  at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
##  at java.io.ObjectInputStream.readObject0(Unknown Source)
##  at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
##  at java.io.ObjectInputStream.readSerialData(Unknown Source)
##  at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
##  at java.io.ObjectInputStream.readObject0(Unknown Source)
##  at java.io.ObjectInputStream.readObject(Unknown Source)
##  at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
##  at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
##  at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
##  at java.lang.reflect.Method.invoke(Unknown Source)
##  at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
##  at java.io.ObjectInputStream.readSerialData(Unknown Source)
##  at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
##  at java.io.ObjectInputStream.readObject0(Unknown Source)
##  at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
##  at java.io.ObjectInputStream.readSerialData(Unknown Source)
##  at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
##  at java.io.ObjectInputStream.readObject0(Unknown Source)
##  at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
##  at java.io.ObjectInputStream.readSerialData(Unknown Source)
##  at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
##  at java.io.ObjectInputStream.readObject0(Unknown Source)
##  at java.io.ObjectInputStream.readObject(Unknown Source)
##  at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
##  at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
##  at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
##  at java.lang.reflect.Method.invoke(Unknown Source)
##  at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
##  at java.io.ObjectInputStream.readSerialData(Unknown Source)
##  at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
##  at java.io.ObjectInputStream.readObject0(Unknown Source)
##  at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
##  at java.io.ObjectInputStream.readSerialData(Unknown Source)
##  at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
##  at java.io.ObjectInputStream.readObject0(Unknown Source)
##  at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
##  at java.io.ObjectInputStream.readSerialData(Unknown Source)
##  at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
##  at java.io.ObjectInputStream.readObject0(Unknown Source)
##  at java.io.ObjectInputStream.readObject(Unknown Source)
##  at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
##  at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
##  at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
##  at java.lang.reflect.Method.invoke(Unknown Source)
##  at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
##  at java.io.ObjectInputStream.readSerialData(Unknown Source)
##  at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
##  at java.io.ObjectInputStream.readObject0(Unknown Source)
##  at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
##  at java.io.ObjectInputStream.readSerialData(Unknown Source)
##  at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
##  at java.io.ObjectInputStream.readObject0(Unknown Source)
##  at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
##  at java.io.ObjectInputStream.readSerialData(Unknown Source)
##  at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
##  at java.io.ObjectInputStream.readObject0(Unknown Source)
##  at java.io.ObjectInputStream.readObject(Unknown Source)
##  at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
##  at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
##  at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
##  at java.lang.reflect.Method.invoke(Unknown Source)
##  at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
##  at java.io.ObjectInputStream.readSerialData(Unknown Source)
##  at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
##  at java.io.ObjectInputStream.readObject0(Unknown Source)
##  at java.io.ObjectInputStream.defaultReadFields(Unknown Sou
## Error: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
## This stopped SparkContext was created at:
## 
## org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:926)
## sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
## sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
## sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
## java.lang.reflect.Method.invoke(Unknown Source)
## sparklyr.Invoke.invoke(invoke.scala:147)
## sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
## sparklyr.StreamHandler.read(stream.scala:66)
## sparklyr.BackendHandler.channelRead0(handler.scala:51)
## sparklyr.BackendHandler.channelRead0(handler.scala:4)
## io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
## io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
## io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
## io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
## io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
## io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
## io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
## io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
## io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
## io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
## 
## The currently active SparkContext was created at:
## 
## org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:926)
## sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
## sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
## sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
## java.lang.reflect.Method.invoke(Unknown Source)
## sparklyr.Invoke.invoke(invoke.scala:147)
## sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
## sparklyr.StreamHandler.read(stream.scala:66)
## sparklyr.BackendHandler.channelRead0(handler.scala:51)
## sparklyr.BackendHandler.channelRead0(handler.scala:4)
## io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
## io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
## io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
## io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
## io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
## io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
## io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
## io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
## io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
## io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
##          
##  at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:100)
##  at org.apache.spark.SparkContext.cancelAllJobs(SparkContext.scala:2248)
##  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
##  at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
##  at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
##  at java.lang.reflect.Method.invoke(Unknown Source)
##  at sparklyr.Invoke.invoke(invoke.scala:147)
##  at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
##  at sparklyr.StreamHandler.read(stream.scala:66)
##  at sparklyr.BackendHandler.channelRead0(handler.scala:51)
##  at sparklyr.BackendHandler.channelRead0(handler.scala:4)
##  at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
##  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
##  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
##  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
##  at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
##  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
##  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
##  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
##  at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
##  at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
##  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
##  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
##  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
##  at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
##  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
##  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
##  at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
##  at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
##  at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
##  at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
##  at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
##  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
##  at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
##  at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
##  at java.lang.Thread.run(Unknown Source)
# summarize the model
als_model
## Error in eval(expr, envir, enclos): object 'als_model' not found
summary(als_model)
## Error in summary(als_model): object 'als_model' not found

Next, the plan was to predict ratings for the test set using the ALS factorization model and compute the RMSE, but needless to say, this isn’t feasible now since the ml_als function call failed.

# function to compute rmse; this code is adapted from code in  
# sparklyr documentation for mse
estimate_rmse <- function(df){
    ml_predict(als_model, df) %>% 
        mutate(resid = rating - prediction) %>% 
        summarize(rmse = sqrt(mean(resid ^ 2))) %>% 
        collect()
}

# compute rmse for training & test sets
sapply(partitions, estimate_rmse)
## Error in ml_predict(als_model, df): object 'als_model' not found

5 Conclusions

In this project I attempted to develop a recommender system for the MovieLense 100k dataset. The approach was to build a recommender model based on the ALS factorization technique, using the sparklyr package to access the MLlib library on Apache Spark. On the positive side, I was able to load the dataset into a Spark table, do some exploratory analysis, and return the results to RStudio. However, for the most important part, I wasn’t able to build a working model because the ml_als function wasn’t accessible with my environment setup.

Some observations and findings from this project include:

  • Complexity of new systems: It was clear from this project that learning to use Apache Spark productively will take a substantial investment of time and effort to learn new platforms and applications (e.g., sparklyr, Databricks, MLlib library). This includes becoming familiar with the Spark ecosystem of functions and libraries, learning new syntax and programming logic, reviewing error logs, etc.
  • Stability of new applications: For this project I used a development version of sparklyr (v1.0.1.9004) and a “preview release” of RStudio (v1.2.1555), as suggested in the sparklyr description page. While working with new applications under development may offer cutting-edge capabilities and new techniques, the risk is that stability may suffer as these applications may not be fully stabilized, tested, and integrated with each other.
  • Efficiency in dealing with large datasets: It seems that for large datasets of >10 million ratings, for instance, the advantages of distributed computing should easily outweigh the costs of switching to and learning new application systems. From my last project working with the MovieLense 100k dataset using recommenderlab in RStudio, it was apparent that efficiency limits were close at hand, as some algorithms took on the order of 5-10 minutes to complete. Scaling this up by a factor of 10x (from 100K to 10 million ratings) would imply processing times on the order of 8-16 hours.