The goal of this project is to practice working with a distributed recommender system.
Issues to consider during the project:
For this project, we will work with the MovieLense 100k dataset of movie ratings, which is a dataset that I used in Project 4. The dataset includes about 100,000 ratings of 1,664 movies from 943 users. Ratings are given as real values ranging from 1 (worse) to 5 (best).
In Project 4 we used the recommenderlab package to develop and test our recommender system using different recommendation algorithms. In this project, we will develop a recommender system using the alternating least squares (ALS) matrix factorization method from the MLLib package in Apache Spark. We will access Apache Spark using the sparklyr package in local mode.
library(tidyverse)
library(recommenderlab)
library(knitr)
library(sparklyr)
library(dplyr)
We start by loading the data and preparing it for the recommender system. We see that the full ratings matrix includes over 99 thousand ratings, out of a total of 1.57 million user-movie pairs. This implies that the ratings matrix is 94% sparse.
# load data
data("MovieLense")
m <- MovieLense
# examine the ratings matrix
head(colnames(m), 10)
## [1] "Toy Story (1995)"
## [2] "GoldenEye (1995)"
## [3] "Four Rooms (1995)"
## [4] "Get Shorty (1995)"
## [5] "Copycat (1995)"
## [6] "Shanghai Triad (Yao a yao yao dao waipo qiao) (1995)"
## [7] "Twelve Monkeys (1995)"
## [8] "Babe (1995)"
## [9] "Dead Man Walking (1995)"
## [10] "Richard III (1995)"
getRatingMatrix(m)[1:10, 1:30]
## 10 x 30 sparse Matrix of class "dgCMatrix"
##
## 1 5 3 4 3 3 5 4 1 5 3 2 5 5 5 5 5 3 4 5 4 1 4 4 3 4 3 2 4 1 3
## 2 4 . . . . . . . . 2 . . 4 4 . . . . 3 . . . . . 4 . . . . .
## 3 . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
## 4 . . . . . . . . . . 4 . . . . . . . . . . . . . . . . . . .
## 5 4 3 . . . . . . . . . . . . . . 4 . . . 3 . . 4 3 . . . 4 .
## 6 4 . . . . . 2 4 4 . . 4 2 5 3 . . . 4 . 3 3 4 . . . . 2 . .
## 7 . . . 5 . . 5 5 5 4 3 5 . . . . . . . . . 5 3 . 3 . 4 5 3 .
## 8 . . . . . . 3 . . . 3 . . . . . . . . . . 5 . . . . . . . .
## 9 . . . . . 5 4 . . . . . . . . . . . . . . . . . . . . . . .
## 10 4 . . 4 . . 4 . 4 . 4 5 3 . . 4 . . . . . 5 5 . . . . . . .
In order to use the ALS matrix factorization function in the Apache Spark MLlib package, we need to recast the ratings matrix into a dataframe with numeric (integer) values for user and item IDs. In the process we also save the actual movie names in case we need them later.
# recast ratings matrix into dataframe with numeric user & item IDs
movie_names <- colnames(m)
colnames(m) <- 1:ncol(m)
df <- as(m, "data.frame")
df$user <- as.integer(df$user)
df$item <- as.integer(df$item)
df$rating <- as.integer(df$rating)
str(df)
## 'data.frame': 99392 obs. of 3 variables:
## $ user : int 1 1 1 1 1 1 1 1 1 1 ...
## $ item : int 1 777 888 999 1110 1221 1332 1443 1554 2 ...
## $ rating: int 5 3 4 3 3 5 4 1 5 3 ...
# view the dataframe
head(df)
## user item rating
## 1 1 1 5
## 453 1 777 3
## 584 1 888 4
## 674 1 999 3
## 883 1 1110 3
## 969 1 1221 5
tail(df)
## user item rating
## 93643 938 45 2
## 94227 938 67 2
## 94450 938 75 4
## 96451 938 201 3
## 97147 938 246 3
## 98112 938 359 3
Next we establish a connection to Apache Spark in local mode, and copy the dataframe to a Spark table.
sc <- spark_connect(master = "local")
ratings_tbl <- copy_to(sc, df, "ratings", overwrite = TRUE)
src_tbls(sc)
## [1] "ratings"
class(ratings_tbl)
## [1] "tbl_spark" "tbl_sql" "tbl_lazy" "tbl"
glimpse(ratings_tbl)
## Observations: ??
## Variables: 3
## Database: spark_connection
## $ user <int> 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1...
## $ item <int> 1, 777, 888, 999, 1110, 1221, 1332, 1443, 1554, 2, 113,...
## $ rating <int> 5, 3, 4, 3, 3, 5, 4, 1, 5, 3, 2, 5, 5, 5, 5, 5, 3, 4, 5...
Now we review the ratings and plot the distribution of rating counts and average ratings, per user and per movie. Unfortunately there seem to be some stability issues with the code below, as it sometimes fails at the collect function calls. This problem retrieving query results back from Apache Spark may be attributable to integration issues between RStudio and Spark. In an R script, the code below works fine and seems to produce the histograms without fail.
# distribution of counts & avg rating per user
by_user <- ratings_tbl %>%
group_by(user) %>%
summarise(count = n(), avg = mean(rating, na.rm = TRUE)) %>%
collect()
head(by_user)
## # A tibble: 6 x 3
## user count avg
## <int> <dbl> <dbl>
## 1 556 208 3.64
## 2 13 180 3.46
## 3 46 98 4.09
## 4 101 19 3.63
## 5 135 127 3.37
## 6 157 68 4.32
ggplot(by_user, aes(count)) + geom_histogram() +
labs(title = "Distribution of Rating Count per User",
x = "Rating Count")
ggplot(by_user, aes(avg)) + geom_histogram() +
labs(title = "Distribution of Average Rating per User",
x = "Average Rating")
summary(by_user)
## user count avg
## Min. : 1.0 Min. : 19.0 Min. :1.497
## 1st Qu.:236.5 1st Qu.: 32.0 1st Qu.:3.325
## Median :472.0 Median : 64.0 Median :3.619
## Mean :472.0 Mean :105.4 Mean :3.588
## 3rd Qu.:707.5 3rd Qu.:147.5 3rd Qu.:3.871
## Max. :943.0 Max. :735.0 Max. :4.870
# distribution of counts & avg rating per movie
by_item <- ratings_tbl %>%
group_by(item) %>%
summarise(count = n(), avg = mean(rating, na.rm = TRUE)) %>%
collect()
head(by_item)
## # A tibble: 6 x 3
## item count avg
## <int> <dbl> <dbl>
## 1 777 131 3.21
## 2 999 209 3.55
## 3 446 183 3.97
## 4 668 39 3.21
## 5 766 69 3.96
## 6 822 174 3.45
par(mfrow = c(1,2))
ggplot(by_item, aes(count)) + geom_histogram() +
labs(title = "Distribution of Rating Count per Movie",
x = "Rating Count")
ggplot(by_item, aes(avg)) + geom_histogram() +
labs(title = "Distribution of Avg. Rating per Movie",
x = "Average Rating")
summary(by_item)
## item count avg
## Min. : 1.0 Min. : 1.00 Min. :1.000
## 1st Qu.: 416.8 1st Qu.: 7.00 1st Qu.:2.665
## Median : 832.5 Median : 27.00 Median :3.162
## Mean : 832.5 Mean : 59.73 Mean :3.077
## 3rd Qu.:1248.2 3rd Qu.: 80.00 3rd Qu.:3.653
## Max. :1664.0 Max. :583.00 Max. :5.000
We will develop a recommender model based on the alternating least squares algorithm (ALS) using the ml_als function in sparklyr. We follow these steps:
partitions$trainingpartitions$testHowever, we can see below that the ml_als function call to Apache Spark runs into problems, and Spark ends up terminating the process. I tried using the ml_als_factorization function referenced in this week’s class materials, but that function was deprecated in an earlier version of sparklyr. In addition, I logged a new issue on the sparklyr issues page. The developer response was to try installing a different version of the local Spark application; I tried using Spark versions 2.4.3 (the current version), 2.3, 2.2, and 2.1, but unfortunately the ml_als function call didn’t work with any of these versions.
# partition the ratings
partitions <- ratings_tbl %>%
sdf_random_split(training = 0.8, test = 0.2, seed = 314159)
# fit the model on the training set
als_model <- partitions$training %>%
ml_als(rating ~ user + item)
## Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 35.0 failed 1 times, most recent failure: Lost task 0.0 in stage 35.0 (TID 181, localhost, executor driver): java.lang.StackOverflowError
## at java.io.ObjectInputStream$BlockDataInputStream.readByte(Unknown Source)
## at java.io.ObjectInputStream.readHandle(Unknown Source)
## at java.io.ObjectInputStream.readClassDesc(Unknown Source)
## at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
## at java.io.ObjectInputStream.readObject0(Unknown Source)
## at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
## at java.io.ObjectInputStream.readSerialData(Unknown Source)
## at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
## at java.io.ObjectInputStream.readObject0(Unknown Source)
## at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
## at java.io.ObjectInputStream.readSerialData(Unknown Source)
## at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
## at java.io.ObjectInputStream.readObject0(Unknown Source)
## at java.io.ObjectInputStream.readObject(Unknown Source)
## at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
## at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
## at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
## at java.lang.reflect.Method.invoke(Unknown Source)
## at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
## at java.io.ObjectInputStream.readSerialData(Unknown Source)
## at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
## at java.io.ObjectInputStream.readObject0(Unknown Source)
## at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
## at java.io.ObjectInputStream.readSerialData(Unknown Source)
## at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
## at java.io.ObjectInputStream.readObject0(Unknown Source)
## at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
## at java.io.ObjectInputStream.readSerialData(Unknown Source)
## at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
## at java.io.ObjectInputStream.readObject0(Unknown Source)
## at java.io.ObjectInputStream.readObject(Unknown Source)
## at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
## at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
## at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
## at java.lang.reflect.Method.invoke(Unknown Source)
## at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
## at java.io.ObjectInputStream.readSerialData(Unknown Source)
## at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
## at java.io.ObjectInputStream.readObject0(Unknown Source)
## at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
## at java.io.ObjectInputStream.readSerialData(Unknown Source)
## at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
## at java.io.ObjectInputStream.readObject0(Unknown Source)
## at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
## at java.io.ObjectInputStream.readSerialData(Unknown Source)
## at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
## at java.io.ObjectInputStream.readObject0(Unknown Source)
## at java.io.ObjectInputStream.readObject(Unknown Source)
## at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
## at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
## at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
## at java.lang.reflect.Method.invoke(Unknown Source)
## at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
## at java.io.ObjectInputStream.readSerialData(Unknown Source)
## at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
## at java.io.ObjectInputStream.readObject0(Unknown Source)
## at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
## at java.io.ObjectInputStream.readSerialData(Unknown Source)
## at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
## at java.io.ObjectInputStream.readObject0(Unknown Source)
## at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
## at java.io.ObjectInputStream.readSerialData(Unknown Source)
## at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
## at java.io.ObjectInputStream.readObject0(Unknown Source)
## at java.io.ObjectInputStream.readObject(Unknown Source)
## at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
## at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
## at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
## at java.lang.reflect.Method.invoke(Unknown Source)
## at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
## at java.io.ObjectInputStream.readSerialData(Unknown Source)
## at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
## at java.io.ObjectInputStream.readObject0(Unknown Source)
## at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
## at java.io.ObjectInputStream.readSerialData(Unknown Source)
## at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
## at java.io.ObjectInputStream.readObject0(Unknown Source)
## at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
## at java.io.ObjectInputStream.readSerialData(Unknown Source)
## at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
## at java.io.ObjectInputStream.readObject0(Unknown Source)
## at java.io.ObjectInputStream.readObject(Unknown Source)
## at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
## at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
## at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
## at java.lang.reflect.Method.invoke(Unknown Source)
## at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
## at java.io.ObjectInputStream.readSerialData(Unknown Source)
## at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
## at java.io.ObjectInputStream.readObject0(Unknown Source)
## at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
## at java.io.ObjectInputStream.readSerialData(Unknown Source)
## at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
## at java.io.ObjectInputStream.readObject0(Unknown Source)
## at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
## at java.io.ObjectInputStream.readSerialData(Unknown Source)
## at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
## at java.io.ObjectInputStream.readObject0(Unknown Source)
## at java.io.ObjectInputStream.readObject(Unknown Source)
## at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
## at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
## at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
## at java.lang.reflect.Method.invoke(Unknown Source)
## at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
## at java.io.ObjectInputStream.readSerialData(Unknown Source)
## at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
## at java.io.ObjectInputStream.readObject0(Unknown Source)
## at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
## at java.io.ObjectInputStream.readSerialData(Unknown Source)
## at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
## at java.io.ObjectInputStream.readObject0(Unknown Source)
## at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
## at java.io.ObjectInputStream.readSerialData(Unknown Source)
## at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
## at java.io.ObjectInputStream.readObject0(Unknown Source)
## at java.io.ObjectInputStream.readObject(Unknown Source)
## at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
## at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
## at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
## at java.lang.reflect.Method.invoke(Unknown Source)
## at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
## at java.io.ObjectInputStream.readSerialData(Unknown Source)
## at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
## at java.io.ObjectInputStream.readObject0(Unknown Source)
## at java.io.ObjectInputStream.defaultReadFields(Unknown Sou
## Error: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
## This stopped SparkContext was created at:
##
## org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:926)
## sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
## sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
## sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
## java.lang.reflect.Method.invoke(Unknown Source)
## sparklyr.Invoke.invoke(invoke.scala:147)
## sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
## sparklyr.StreamHandler.read(stream.scala:66)
## sparklyr.BackendHandler.channelRead0(handler.scala:51)
## sparklyr.BackendHandler.channelRead0(handler.scala:4)
## io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
## io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
## io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
## io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
## io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
## io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
## io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
## io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
## io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
## io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
##
## The currently active SparkContext was created at:
##
## org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:926)
## sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
## sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
## sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
## java.lang.reflect.Method.invoke(Unknown Source)
## sparklyr.Invoke.invoke(invoke.scala:147)
## sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
## sparklyr.StreamHandler.read(stream.scala:66)
## sparklyr.BackendHandler.channelRead0(handler.scala:51)
## sparklyr.BackendHandler.channelRead0(handler.scala:4)
## io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
## io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
## io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
## io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
## io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
## io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
## io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
## io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
## io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
## io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
##
## at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:100)
## at org.apache.spark.SparkContext.cancelAllJobs(SparkContext.scala:2248)
## at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
## at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
## at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
## at java.lang.reflect.Method.invoke(Unknown Source)
## at sparklyr.Invoke.invoke(invoke.scala:147)
## at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
## at sparklyr.StreamHandler.read(stream.scala:66)
## at sparklyr.BackendHandler.channelRead0(handler.scala:51)
## at sparklyr.BackendHandler.channelRead0(handler.scala:4)
## at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
## at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
## at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
## at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
## at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
## at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
## at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
## at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
## at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
## at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
## at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
## at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
## at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
## at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
## at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
## at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
## at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
## at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
## at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
## at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
## at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
## at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
## at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
## at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
## at java.lang.Thread.run(Unknown Source)
# summarize the model
als_model
## Error in eval(expr, envir, enclos): object 'als_model' not found
summary(als_model)
## Error in summary(als_model): object 'als_model' not found
Next, the plan was to predict ratings for the test set using the ALS factorization model and compute the RMSE, but needless to say, this isn’t feasible now since the ml_als function call failed.
# function to compute rmse; this code is adapted from code in
# sparklyr documentation for mse
estimate_rmse <- function(df){
ml_predict(als_model, df) %>%
mutate(resid = rating - prediction) %>%
summarize(rmse = sqrt(mean(resid ^ 2))) %>%
collect()
}
# compute rmse for training & test sets
sapply(partitions, estimate_rmse)
## Error in ml_predict(als_model, df): object 'als_model' not found
In this project I attempted to develop a recommender system for the MovieLense 100k dataset. The approach was to build a recommender model based on the ALS factorization technique, using the sparklyr package to access the MLlib library on Apache Spark. On the positive side, I was able to load the dataset into a Spark table, do some exploratory analysis, and return the results to RStudio. However, for the most important part, I wasn’t able to build a working model because the ml_als function wasn’t accessible with my environment setup.
Some observations and findings from this project include:
sparklyr, Databricks, MLlib library). This includes becoming familiar with the Spark ecosystem of functions and libraries, learning new syntax and programming logic, reviewing error logs, etc.sparklyr (v1.0.1.9004) and a “preview release” of RStudio (v1.2.1555), as suggested in the sparklyr description page. While working with new applications under development may offer cutting-edge capabilities and new techniques, the risk is that stability may suffer as these applications may not be fully stabilized, tested, and integrated with each other.MovieLense 100k dataset using recommenderlab in RStudio, it was apparent that efficiency limits were close at hand, as some algorithms took on the order of 5-10 minutes to complete. Scaling this up by a factor of 10x (from 100K to 10 million ratings) would imply processing times on the order of 8-16 hours.