Getting Started with Spark in R

I love dplyr for my daily R analysis tasks. And now there is a contender for working with larger-sized data. Or rather, I should say it will complement dplyr for certain tasks and data. Historically, one of the cons of R has been that everything has to fit in memory. The new Spark R package is probably the most promising for me of all the recent R libraries that try to deal with larger-sized data. As you can tell, I am very excited about Spark in R.

Apache Spark and Databricks has announced that Spark 1.4 includes the SparkR package for analyze large datasets. You can use it in the shell or in the Rstudio console. It works really nicely in Rstudio.

According to the blog post, “SparkR DataFrames support all Spark DataFrame operations including aggregation, filtering, grouping, summary statistics, and other analytical functions. They also supports mixing-in SQL queries, and converting query results to and from DataFrames. Because SparkR uses the Spark’s parallel engine underneath, operations take advantage of multiple cores or multiple machines, and can scale to data sizes much larger than standalone R programs.”

Another nice feature is that you can convert your data to other formats including Apache Parquet, json, and other data storage formats.

This tutorial shows you some of the things you can do in R with Spark using the famous Diamonds dataset as an example. Most of these commands will be very familiar to most R users.

Load in your global environment for Spark.

.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)

You’ll need to create a sqlContext and initialize it.

sc <- sparkR.init(master="local")
sqlContext <- sparkRSQL.init(sc)

The createDataFrame() function will create a Spark Dataframe.

df <- createDataFrame(sqlContext, diamonds)

Let’s look at the Schema for DataFrame.

printSchema(df)
## root
##  |-- carat: double (nullable = true)
##  |-- cut: string (nullable = true)
##  |-- color: string (nullable = true)
##  |-- clarity: string (nullable = true)
##  |-- depth: double (nullable = true)
##  |-- table: double (nullable = true)
##  |-- price: integer (nullable = true)
##  |-- x: double (nullable = true)
##  |-- y: double (nullable = true)
##  |-- z: double (nullable = true)

Let’s look at the first few rows. This will return output in a form R users are used to.

head(df)
##   carat       cut color clarity depth table price    x    y    z
## 1  0.23     Ideal     E     SI2  61.5    55   326 3.95 3.98 2.43
## 2  0.21   Premium     E     SI1  59.8    61   326 3.89 3.84 2.31
## 3  0.23      Good     E     VS1  56.9    65   327 4.05 4.07 2.31
## 4  0.29   Premium     I     VS2  62.4    58   334 4.20 4.23 2.63
## 5  0.31      Good     J     SI2  63.3    58   335 4.34 4.35 2.75
## 6  0.24 Very Good     J    VVS2  62.8    57   336 3.94 3.96 2.48

You can also use showDF() which will give more of a sql type of printout of the data.

showDF(df)
## +-----+---------+-----+-------+-----+-----+-----+----+----+----+
## |carat|      cut|color|clarity|depth|table|price|   x|   y|   z|
## +-----+---------+-----+-------+-----+-----+-----+----+----+----+
## | 0.23|    Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|
## | 0.21|  Premium|    E|    SI1| 59.8| 61.0|  326|3.89|3.84|2.31|
## | 0.23|     Good|    E|    VS1| 56.9| 65.0|  327|4.05|4.07|2.31|
## | 0.29|  Premium|    I|    VS2| 62.4| 58.0|  334| 4.2|4.23|2.63|
## | 0.31|     Good|    J|    SI2| 63.3| 58.0|  335|4.34|4.35|2.75|
## | 0.24|Very Good|    J|   VVS2| 62.8| 57.0|  336|3.94|3.96|2.48|
## | 0.24|Very Good|    I|   VVS1| 62.3| 57.0|  336|3.95|3.98|2.47|
## | 0.26|Very Good|    H|    SI1| 61.9| 55.0|  337|4.07|4.11|2.53|
## | 0.22|     Fair|    E|    VS2| 65.1| 61.0|  337|3.87|3.78|2.49|
## | 0.23|Very Good|    H|    VS1| 59.4| 61.0|  338| 4.0|4.05|2.39|
## |  0.3|     Good|    J|    SI1| 64.0| 55.0|  339|4.25|4.28|2.73|
## | 0.23|    Ideal|    J|    VS1| 62.8| 56.0|  340|3.93| 3.9|2.46|
## | 0.22|  Premium|    F|    SI1| 60.4| 61.0|  342|3.88|3.84|2.33|
## | 0.31|    Ideal|    J|    SI2| 62.2| 54.0|  344|4.35|4.37|2.71|
## |  0.2|  Premium|    E|    SI2| 60.2| 62.0|  345|3.79|3.75|2.27|
## | 0.32|  Premium|    E|     I1| 60.9| 58.0|  345|4.38|4.42|2.68|
## |  0.3|    Ideal|    I|    SI2| 62.0| 54.0|  348|4.31|4.34|2.68|
## |  0.3|     Good|    J|    SI1| 63.4| 54.0|  351|4.23|4.29| 2.7|
## |  0.3|     Good|    J|    SI1| 63.8| 56.0|  351|4.23|4.26|2.71|
## |  0.3|Very Good|    J|    SI1| 62.7| 59.0|  351|4.21|4.27|2.66|
## +-----+---------+-----+-------+-----+-----+-----+----+----+----+

Return a sampled subset of the DataFrame using a random seed. Note that you cannot performSpark actions on an R DataFrame.

sample <- collect(sample(df, FALSE, 0.5)) 
head(sample)
##   carat       cut color clarity depth table price    x    y    z
## 1  0.23     Ideal     E     SI2  61.5    55   326 3.95 3.98 2.43
## 2  0.21   Premium     E     SI1  59.8    61   326 3.89 3.84 2.31
## 3  0.23      Good     E     VS1  56.9    65   327 4.05 4.07 2.31
## 4  0.24 Very Good     J    VVS2  62.8    57   336 3.94 3.96 2.48
## 5  0.24 Very Good     I    VVS1  62.3    57   336 3.95 3.98 2.47
## 6  0.26 Very Good     H     SI1  61.9    55   337 4.07 4.11 2.53

This will coerce a Spark DataFrame into an R DataFrame.

collected <- collect(df)
head(collected)
##   carat       cut color clarity depth table price    x    y    z
## 1  0.23     Ideal     E     SI2  61.5    55   326 3.95 3.98 2.43
## 2  0.21   Premium     E     SI1  59.8    61   326 3.89 3.84 2.31
## 3  0.23      Good     E     VS1  56.9    65   327 4.05 4.07 2.31
## 4  0.29   Premium     I     VS2  62.4    58   334 4.20 4.23 2.63
## 5  0.31      Good     J     SI2  63.3    58   335 4.34 4.35 2.75
## 6  0.24 Very Good     J    VVS2  62.8    57   336 3.94 3.96 2.48

Takes the first n rows of a DataFrame and return the results as a data.frame.

take(df, 2)
##   carat     cut color clarity depth table price    x    y    z
## 1  0.23   Ideal     E     SI2  61.5    55   326 3.95 3.98 2.43
## 2  0.21 Premium     E     SI1  59.8    61   326 3.89 3.84 2.31

Count the number of rows for each color in the Diamonds dataset using the count() function. The resulting DataFrame will also contain the grouping columns.

count <- count(groupBy(df, "color"))
head(count)
##   color count
## 1     D  6775
## 2     E  9797
## 3     F  9542
## 4     G 11292
## 5     H  8304
## 6     I  5422

The agg() function will return the entire DataFrame without groups.The resulting DataFrame will also contain the grouping columns.

Return the arithmetic mean.

df2 <- agg(df, caratMean = mean(df$carat))
collect(df2)
##   caratMean
## 1 0.7979397

You can also obtain the result without assigning it to a variable. Here, I want the maximum carat.

collect(agg(df, caratMax = max(df$carat)))
##   caratMax
## 1     5.01

Compute the average for all numeric columns grouped by clarity.

avg <- avg(groupBy(df, "clarity"))
collect(avg)
##   clarity AVG(carat) AVG(depth) AVG(table) AVG(price)   AVG(x)   AVG(y) 
## 1     VS1  0.7271582   61.66746   57.31515   3839.455 5.572178 5.581828
## 2     VS2  0.7639346   61.72442   57.41740   3924.989 5.657709 5.658859
## 3     SI1  0.8504822   61.85304   57.66254   3996.001 5.888383 5.888256
## 4     SI2  1.0776485   61.77217   57.92718   5063.029 6.401370 6.397826
## 5      I1  1.2838462   62.73428   58.30378   3924.169 6.761093 6.709379
## 6      IF  0.5051229   61.51061   56.50721   2864.839 4.968402 4.989827
## 7    VVS1  0.5033215   61.62465   56.88446   2523.115 4.960364 4.975075
## 8    VVS2  0.5962021   61.66378   57.02499   3283.737 5.218454 5.232118

Compute the max price and average depth, grouped by carat and clarity.

agg2 <- agg(groupBy(df, "carat", "clarity"), "depth" = "avg", "price" = "max")
head(agg2)
##   carat clarity AVG(depth) MAX(price)
## 1  3.00     SI2   61.60000      16970
## 2  0.34    VVS1   61.55464       1272
## 3  1.29     VS2   61.06667       9243
## 4  1.05      IF   60.81667      13060
## 5  2.48     SI1   62.15000      17893
## 6  0.30     SI2   62.01356        574

Use a selectExpr() function to add a new column with a calculated value and select columns.

exprdf <- selectExpr(df, "clarity", "(price * 5) as newCol")
collect(exprdf)
##       clarity newCol
## 1         SI2   1630
## 2         SI1   1630
## 3         VS1   1635
## 4         VS2   1670
## 5         SI2   1675

Using mutate() will return a new column with calculated values.

newDF <- mutate(df, newCol = df$price * 5)
collect(newDF)
## carat       cut color clarity depth table price     x     y     z  
## 0.23     Ideal     E     SI2  61.5  55.0   326  3.95  3.98  2.43
## 0.21   Premium     E     SI1  59.8  61.0   326  3.89  3.84  2.31  
## 0.29   Premium     I     VS2  62.4  58.0   334  4.20  4.23  2.63
## 0.31      Good     J     SI2  63.3  58.0   335  4.34  4.35  2.75

## newCol
## 1630
## 1630
## 635
## 1670
## 1675

Columns can be renamed.

renamecolDF <- withColumnRenamed(df, "price", "theprice")
head(renamecolDF)
##   carat       cut color clarity depth table theprice    x    y    z
## 1  0.23     Ideal     E     SI2  61.5    55      326 3.95 3.98 2.43
## 2  0.21   Premium     E     SI1  59.8    61      326 3.89 3.84 2.31
## 3  0.23      Good     E     VS1  56.9    65      327 4.05 4.07 2.31
## 4  0.29   Premium     I     VS2  62.4    58      334 4.20 4.23 2.63
## 5  0.31      Good     J     SI2  63.3    58      335 4.34 4.35 2.75
## 6  0.24 Very Good     J    VVS2  62.8    57      336 3.94 3.96 2.48

Sort a DataFrame by the specified column(s).

arrange <- arrange(df, df$cut)
collect(arrange)
##       carat       cut color clarity depth table price     x     y     z
## 1      0.22      Fair     E     VS2  65.1  61.0   337  3.87  3.78  2.49
## 2      0.86      Fair     E     SI2  55.1  69.0  2757  6.45  6.33  3.52
## 3      0.96      Fair     F     SI2  66.3  62.0  2759  6.27  5.95  4.07
## 4      0.70      Fair     F     VS2  64.5  57.0  2762  5.57  5.53  3.58
## 5      0.70      Fair     F     VS2  65.3  55.0  2762  5.63  5.58  3.66

Filter the rows of a DataFrame according to a given condition.

filter <- filter(df, "price < 1000")
head(filter)
##   carat       cut color clarity depth table price    x    y    z
## 1  0.23     Ideal     E     SI2  61.5    55   326 3.95 3.98 2.43
## 2  0.21   Premium     E     SI1  59.8    61   326 3.89 3.84 2.31
## 3  0.23      Good     E     VS1  56.9    65   327 4.05 4.07 2.31
## 4  0.29   Premium     I     VS2  62.4    58   334 4.20 4.23 2.63
## 5  0.31      Good     J     SI2  63.3    58   335 4.34 4.35 2.75
## 6  0.24 Very Good     J    VVS2  62.8    57   336 3.94 3.96 2.48

Summary statistics on a per column basis.

describe <- describe(df$carat)
collect(describe)
##   summary               carat       cut color clarity              depth
## 1   count               53940     53940 53940   53940              53940
## 2    mean  0.7979397478679852      <NA>  <NA>    <NA>  61.74940489432624
## 3  stddev 0.47400685050988317      <NA>  <NA>    <NA> 1.4326080390588254
## 4     min                 0.2      Fair     D      I1               43.0
## 5     max                5.01 Very Good     J    VVS2               79.0
##               table             price                  x
## 1             53940             53940              53940
## 2 57.45718390804603 3932.799721913237  5.731157211716609
## 3 2.234469849982371 3989.402757628873 1.1217503485180496
## 4              43.0               326                0.0
## 5              95.0             18823              10.74
##                    y                  z
## 1              53940              53940
## 2  5.734525954764462 3.5387337782723316
## 3 1.1421240869909135 0.7056923054021117
## 4                0.0                0.0
## 5               58.9               31.8

I hope this helps you get started using Spark in R! There is much more to explore in Spark that has not been covered in this post including joins, sql, converting to Parquet, and more. I encourage you to explore it and I’m sure you will find Spark in R useful on your larger-sized data that doesn’t fit into memory.