I love dplyr for my daily R analysis tasks. And now there is a contender for working with larger-sized data. Or rather, I should say it will complement dplyr for certain tasks and data. Historically, one of the cons of R has been that everything has to fit in memory. The new Spark R package is probably the most promising for me of all the recent R libraries that try to deal with larger-sized data. As you can tell, I am very excited about Spark in R.
Apache Spark and Databricks has announced that Spark 1.4 includes the SparkR package for analyze large datasets. You can use it in the shell or in the Rstudio console. It works really nicely in Rstudio.
According to the blog post, “SparkR DataFrames support all Spark DataFrame operations including aggregation, filtering, grouping, summary statistics, and other analytical functions. They also supports mixing-in SQL queries, and converting query results to and from DataFrames. Because SparkR uses the Spark’s parallel engine underneath, operations take advantage of multiple cores or multiple machines, and can scale to data sizes much larger than standalone R programs.”
Another nice feature is that you can convert your data to other formats including Apache Parquet, json, and other data storage formats.
This tutorial shows you some of the things you can do in R with Spark using the famous Diamonds dataset as an example. Most of these commands will be very familiar to most R users.
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)
sc <- sparkR.init(master="local")
sqlContext <- sparkRSQL.init(sc)
df <- createDataFrame(sqlContext, diamonds)
printSchema(df)
## root
## |-- carat: double (nullable = true)
## |-- cut: string (nullable = true)
## |-- color: string (nullable = true)
## |-- clarity: string (nullable = true)
## |-- depth: double (nullable = true)
## |-- table: double (nullable = true)
## |-- price: integer (nullable = true)
## |-- x: double (nullable = true)
## |-- y: double (nullable = true)
## |-- z: double (nullable = true)
head(df)
## carat cut color clarity depth table price x y z
## 1 0.23 Ideal E SI2 61.5 55 326 3.95 3.98 2.43
## 2 0.21 Premium E SI1 59.8 61 326 3.89 3.84 2.31
## 3 0.23 Good E VS1 56.9 65 327 4.05 4.07 2.31
## 4 0.29 Premium I VS2 62.4 58 334 4.20 4.23 2.63
## 5 0.31 Good J SI2 63.3 58 335 4.34 4.35 2.75
## 6 0.24 Very Good J VVS2 62.8 57 336 3.94 3.96 2.48
showDF(df)
## +-----+---------+-----+-------+-----+-----+-----+----+----+----+
## |carat| cut|color|clarity|depth|table|price| x| y| z|
## +-----+---------+-----+-------+-----+-----+-----+----+----+----+
## | 0.23| Ideal| E| SI2| 61.5| 55.0| 326|3.95|3.98|2.43|
## | 0.21| Premium| E| SI1| 59.8| 61.0| 326|3.89|3.84|2.31|
## | 0.23| Good| E| VS1| 56.9| 65.0| 327|4.05|4.07|2.31|
## | 0.29| Premium| I| VS2| 62.4| 58.0| 334| 4.2|4.23|2.63|
## | 0.31| Good| J| SI2| 63.3| 58.0| 335|4.34|4.35|2.75|
## | 0.24|Very Good| J| VVS2| 62.8| 57.0| 336|3.94|3.96|2.48|
## | 0.24|Very Good| I| VVS1| 62.3| 57.0| 336|3.95|3.98|2.47|
## | 0.26|Very Good| H| SI1| 61.9| 55.0| 337|4.07|4.11|2.53|
## | 0.22| Fair| E| VS2| 65.1| 61.0| 337|3.87|3.78|2.49|
## | 0.23|Very Good| H| VS1| 59.4| 61.0| 338| 4.0|4.05|2.39|
## | 0.3| Good| J| SI1| 64.0| 55.0| 339|4.25|4.28|2.73|
## | 0.23| Ideal| J| VS1| 62.8| 56.0| 340|3.93| 3.9|2.46|
## | 0.22| Premium| F| SI1| 60.4| 61.0| 342|3.88|3.84|2.33|
## | 0.31| Ideal| J| SI2| 62.2| 54.0| 344|4.35|4.37|2.71|
## | 0.2| Premium| E| SI2| 60.2| 62.0| 345|3.79|3.75|2.27|
## | 0.32| Premium| E| I1| 60.9| 58.0| 345|4.38|4.42|2.68|
## | 0.3| Ideal| I| SI2| 62.0| 54.0| 348|4.31|4.34|2.68|
## | 0.3| Good| J| SI1| 63.4| 54.0| 351|4.23|4.29| 2.7|
## | 0.3| Good| J| SI1| 63.8| 56.0| 351|4.23|4.26|2.71|
## | 0.3|Very Good| J| SI1| 62.7| 59.0| 351|4.21|4.27|2.66|
## +-----+---------+-----+-------+-----+-----+-----+----+----+----+
sample <- collect(sample(df, FALSE, 0.5))
head(sample)
## carat cut color clarity depth table price x y z
## 1 0.23 Ideal E SI2 61.5 55 326 3.95 3.98 2.43
## 2 0.21 Premium E SI1 59.8 61 326 3.89 3.84 2.31
## 3 0.23 Good E VS1 56.9 65 327 4.05 4.07 2.31
## 4 0.24 Very Good J VVS2 62.8 57 336 3.94 3.96 2.48
## 5 0.24 Very Good I VVS1 62.3 57 336 3.95 3.98 2.47
## 6 0.26 Very Good H SI1 61.9 55 337 4.07 4.11 2.53
collected <- collect(df)
head(collected)
## carat cut color clarity depth table price x y z
## 1 0.23 Ideal E SI2 61.5 55 326 3.95 3.98 2.43
## 2 0.21 Premium E SI1 59.8 61 326 3.89 3.84 2.31
## 3 0.23 Good E VS1 56.9 65 327 4.05 4.07 2.31
## 4 0.29 Premium I VS2 62.4 58 334 4.20 4.23 2.63
## 5 0.31 Good J SI2 63.3 58 335 4.34 4.35 2.75
## 6 0.24 Very Good J VVS2 62.8 57 336 3.94 3.96 2.48
take(df, 2)
## carat cut color clarity depth table price x y z
## 1 0.23 Ideal E SI2 61.5 55 326 3.95 3.98 2.43
## 2 0.21 Premium E SI1 59.8 61 326 3.89 3.84 2.31
count <- count(groupBy(df, "color"))
head(count)
## color count
## 1 D 6775
## 2 E 9797
## 3 F 9542
## 4 G 11292
## 5 H 8304
## 6 I 5422
df2 <- agg(df, caratMean = mean(df$carat))
collect(df2)
## caratMean
## 1 0.7979397
collect(agg(df, caratMax = max(df$carat)))
## caratMax
## 1 5.01
avg <- avg(groupBy(df, "clarity"))
collect(avg)
## clarity AVG(carat) AVG(depth) AVG(table) AVG(price) AVG(x) AVG(y)
## 1 VS1 0.7271582 61.66746 57.31515 3839.455 5.572178 5.581828
## 2 VS2 0.7639346 61.72442 57.41740 3924.989 5.657709 5.658859
## 3 SI1 0.8504822 61.85304 57.66254 3996.001 5.888383 5.888256
## 4 SI2 1.0776485 61.77217 57.92718 5063.029 6.401370 6.397826
## 5 I1 1.2838462 62.73428 58.30378 3924.169 6.761093 6.709379
## 6 IF 0.5051229 61.51061 56.50721 2864.839 4.968402 4.989827
## 7 VVS1 0.5033215 61.62465 56.88446 2523.115 4.960364 4.975075
## 8 VVS2 0.5962021 61.66378 57.02499 3283.737 5.218454 5.232118
agg2 <- agg(groupBy(df, "carat", "clarity"), "depth" = "avg", "price" = "max")
head(agg2)
## carat clarity AVG(depth) MAX(price)
## 1 3.00 SI2 61.60000 16970
## 2 0.34 VVS1 61.55464 1272
## 3 1.29 VS2 61.06667 9243
## 4 1.05 IF 60.81667 13060
## 5 2.48 SI1 62.15000 17893
## 6 0.30 SI2 62.01356 574
exprdf <- selectExpr(df, "clarity", "(price * 5) as newCol")
collect(exprdf)
## clarity newCol
## 1 SI2 1630
## 2 SI1 1630
## 3 VS1 1635
## 4 VS2 1670
## 5 SI2 1675
newDF <- mutate(df, newCol = df$price * 5)
collect(newDF)
## carat cut color clarity depth table price x y z
## 0.23 Ideal E SI2 61.5 55.0 326 3.95 3.98 2.43
## 0.21 Premium E SI1 59.8 61.0 326 3.89 3.84 2.31
## 0.29 Premium I VS2 62.4 58.0 334 4.20 4.23 2.63
## 0.31 Good J SI2 63.3 58.0 335 4.34 4.35 2.75
## newCol
## 1630
## 1630
## 635
## 1670
## 1675
renamecolDF <- withColumnRenamed(df, "price", "theprice")
head(renamecolDF)
## carat cut color clarity depth table theprice x y z
## 1 0.23 Ideal E SI2 61.5 55 326 3.95 3.98 2.43
## 2 0.21 Premium E SI1 59.8 61 326 3.89 3.84 2.31
## 3 0.23 Good E VS1 56.9 65 327 4.05 4.07 2.31
## 4 0.29 Premium I VS2 62.4 58 334 4.20 4.23 2.63
## 5 0.31 Good J SI2 63.3 58 335 4.34 4.35 2.75
## 6 0.24 Very Good J VVS2 62.8 57 336 3.94 3.96 2.48
arrange <- arrange(df, df$cut)
collect(arrange)
## carat cut color clarity depth table price x y z
## 1 0.22 Fair E VS2 65.1 61.0 337 3.87 3.78 2.49
## 2 0.86 Fair E SI2 55.1 69.0 2757 6.45 6.33 3.52
## 3 0.96 Fair F SI2 66.3 62.0 2759 6.27 5.95 4.07
## 4 0.70 Fair F VS2 64.5 57.0 2762 5.57 5.53 3.58
## 5 0.70 Fair F VS2 65.3 55.0 2762 5.63 5.58 3.66
filter <- filter(df, "price < 1000")
head(filter)
## carat cut color clarity depth table price x y z
## 1 0.23 Ideal E SI2 61.5 55 326 3.95 3.98 2.43
## 2 0.21 Premium E SI1 59.8 61 326 3.89 3.84 2.31
## 3 0.23 Good E VS1 56.9 65 327 4.05 4.07 2.31
## 4 0.29 Premium I VS2 62.4 58 334 4.20 4.23 2.63
## 5 0.31 Good J SI2 63.3 58 335 4.34 4.35 2.75
## 6 0.24 Very Good J VVS2 62.8 57 336 3.94 3.96 2.48
describe <- describe(df$carat)
collect(describe)
## summary carat cut color clarity depth
## 1 count 53940 53940 53940 53940 53940
## 2 mean 0.7979397478679852 <NA> <NA> <NA> 61.74940489432624
## 3 stddev 0.47400685050988317 <NA> <NA> <NA> 1.4326080390588254
## 4 min 0.2 Fair D I1 43.0
## 5 max 5.01 Very Good J VVS2 79.0
## table price x
## 1 53940 53940 53940
## 2 57.45718390804603 3932.799721913237 5.731157211716609
## 3 2.234469849982371 3989.402757628873 1.1217503485180496
## 4 43.0 326 0.0
## 5 95.0 18823 10.74
## y z
## 1 53940 53940
## 2 5.734525954764462 3.5387337782723316
## 3 1.1421240869909135 0.7056923054021117
## 4 0.0 0.0
## 5 58.9 31.8
I hope this helps you get started using Spark in R! There is much more to explore in Spark that has not been covered in this post including joins, sql, converting to Parquet, and more. I encourage you to explore it and I’m sure you will find Spark in R useful on your larger-sized data that doesn’t fit into memory.