+ - 0:00:00
Notes for current slide
Notes for next slide

Scaling R with Spark

using sparklyr

Javier Luraschi

01/17/2019

1 / 32

What to do when code is slow?

mtcars %>% lm(mpg ~ wt + cyl, .)
2 / 32

What to do when code is slow?

mtcars %>% lm(mpg ~ wt + cyl, .)
# Sample
mtcars %>% dplyr::sample_n(10) %>% lm(mpg ~ wt + cyl, .)
3 / 32

What to do when code is slow?

mtcars %>% lm(mpg ~ wt + cyl, .)
# Sample
mtcars %>% dplyr::sample_n(10) %>% lm(mpg ~ wt + cyl, .)
# Profile
profvis::profvis(mtcars %>% lm(mpg ~ wt + cyl, .))
4 / 32

What to do when code is slow?

mtcars %>% lm(mpg ~ wt + cyl, .)
# Sample
mtcars %>% dplyr::sample_n(10) %>% lm(mpg ~ wt + cyl, .)
# Profile
profvis::profvis(mtcars %>% lm(mpg ~ wt + cyl, .))
# Scale Up
cloudml::cloudml_train("train.R")
5 / 32

What to do when code is slow?

mtcars %>% lm(mpg ~ wt + cyl, .)
# Sample
mtcars %>% dplyr::sample_n(10) %>% lm(mpg ~ wt + cyl, .)
# Profile
profvis::profvis(mtcars %>% lm(mpg ~ wt + cyl, .))
# Scale Up
cloudml::cloudml_train("train.R")
# Scale Out
mtcars_tbl %>% sparklyr::ml_linear_regression(mpg ~ wt + cyl)
6 / 32

What to do when code is slow?

mtcars %>% lm(mpg ~ wt + cyl, .)
# Sample
mtcars %>% dplyr::sample_n(10) %>% lm(mpg ~ wt + cyl, .)
# Profile
profvis::profvis(mtcars %>% lm(mpg ~ wt + cyl, .))
# Scale Up
cloudml::cloudml_train("train.R")
# Scale Out
mtcars_tbl %>% sparklyr::ml_linear_regression(mpg ~ wt + cyl)

Note: There are many more ways to sample, scale-up and scale-out.

7 / 32

Scaling Out with R and Spark

# Scale Out
mtcars_tbl %>% sparklyr::ml_linear_regression(mpg ~ wt + cyl)
8 / 32

How to use R with Spark?

library(sparklyr) # R Interface to Apache Spark
spark_install() # Install Apache Spark
sc <- spark_connect(master = "local") # Connect to Spark cluster
9 / 32

How to use R with Spark?

library(sparklyr) # R Interface to Apache Spark
spark_install() # Install Apache Spark
sc <- spark_connect(master = "local") # Connect to Spark cluster
cars_tbl <- spark_read_csv(sc, "cars", "mtcars/") # Read data in Spark
summarize(cars_tbl, n = n()) # Count records with dplyr
dbGetQuery(sc, "SELECT count(*) FROM cars") # Count records with DBI
10 / 32

How to use R with Spark?

library(sparklyr) # R Interface to Apache Spark
spark_install() # Install Apache Spark
sc <- spark_connect(master = "local") # Connect to Spark cluster
cars_tbl <- spark_read_csv(sc, "cars", "mtcars/") # Read data in Spark
summarize(cars_tbl, n = n()) # Count records with dplyr
dbGetQuery(sc, "SELECT count(*) FROM cars") # Count records with DBI
ml_linear_regression(cars_tbl, mpg ~ wt + cyl) # Perform linear regression
11 / 32

How to use R with Spark?

library(sparklyr) # R Interface to Apache Spark
spark_install() # Install Apache Spark
sc <- spark_connect(master = "local") # Connect to Spark cluster
cars_tbl <- spark_read_csv(sc, "cars", "mtcars/") # Read data in Spark
summarize(cars_tbl, n = n()) # Count records with dplyr
dbGetQuery(sc, "SELECT count(*) FROM cars") # Count records with DBI
ml_linear_regression(cars_tbl, mpg ~ wt + cyl) # Perform linear regression
pipeline <- ml_pipeline(sc) %>% # Define Spark pipeline
ft_r_formula(mpg ~ wt + cyl) %>% # Add formula transformation
ml_linear_regression() # Add model to pipeline
fitted <- ml_fit(pipeline, cars_tbl) # Fit pipeline
12 / 32

How to use R with Spark?

library(sparklyr) # R Interface to Apache Spark
spark_install() # Install Apache Spark
sc <- spark_connect(master = "local") # Connect to Spark cluster
cars_tbl <- spark_read_csv(sc, "cars", "mtcars/") # Read data in Spark
summarize(cars_tbl, n = n()) # Count records with dplyr
dbGetQuery(sc, "SELECT count(*) FROM cars") # Count records with DBI
ml_linear_regression(cars_tbl, mpg ~ wt + cyl) # Perform linear regression
pipeline <- ml_pipeline(sc) %>% # Define Spark pipeline
ft_r_formula(mpg ~ wt + cyl) %>% # Add formula transformation
ml_linear_regression() # Add model to pipeline
fitted <- ml_fit(pipeline, cars_tbl) # Fit pipeline
spark_context(sc) %>% invoke("version") # Extend sparklyr with Scala
spark_apply(cars_tbl, nrow) # Extend sparklyr with R
13 / 32

What about realtime data?

14 / 32

Spark Structured Streams

cars_str <- stream_read_csv(sc, "mtcars/", "cars") # Read stream in Spark
15 / 32

Spark Structured Streams

cars_str <- stream_read_csv(sc, "mtcars/", "cars") # Read stream in Spark
out_str <- summarize(cars_str, n = n()) # Count records with dplyr
out_str <- dbGetQuery(sc, "SELECT count(*) FROM cars") # Count records with DBI
16 / 32

Spark Structured Streams

cars_str <- stream_read_csv(sc, "mtcars/", "cars") # Read stream in Spark
out_str <- summarize(cars_str, n = n()) # Count records with dplyr
out_str <- dbGetQuery(sc, "SELECT count(*) FROM cars") # Count records with DBI
out_str <- ml_transform(fitted, cars_str) # Apply pipeline to stream
17 / 32

Spark Structured Streams

cars_str <- stream_read_csv(sc, "mtcars/", "cars") # Read stream in Spark
out_str <- summarize(cars_str, n = n()) # Count records with dplyr
out_str <- dbGetQuery(sc, "SELECT count(*) FROM cars") # Count records with DBI
out_str <- ml_transform(fitted, cars_str) # Apply pipeline to stream
out_str <- spark_apply(cars_str, nrow) # Extend streams with R
18 / 32

Spark Structured Streams

cars_str <- stream_read_csv(sc, "mtcars/", "cars") # Read stream in Spark
out_str <- summarize(cars_str, n = n()) # Count records with dplyr
out_str <- dbGetQuery(sc, "SELECT count(*) FROM cars") # Count records with DBI
out_str <- ml_transform(fitted, cars_str) # Apply pipeline to stream
out_str <- spark_apply(cars_str, nrow) # Extend streams with R
stream_write_csv(out_str, "output/") # Write as a CSV stream
reactiveSpark(out_str) # Use as a Shiny reactive
19 / 32

Streaming with Spark, Kafka and Shiny

Apache Kafka is an open-source stream-processing software platform that provides a unified, high-throughput and low-latency for handling real-time data feeds.

20 / 32

Streaming with Spark, Kafka and Shiny

Apache Kafka is an open-source stream-processing software platform that provides a unified, high-throughput and low-latency for handling real-time data feeds.

21 / 32

What's new in Spark and R?

Streams, MLeap, Kubernetes and RStudio 1.2 integration.

22 / 32

What's new in Spark and R?

Streams, MLeap, Kubernetes and RStudio 1.2 integration.

23 / 32

What are we currently working on?

- Faster data transfer with Apache Arrow: pull/1611.

24 / 32

What are we currently working on?

- Faster data transfer with Apache Arrow: pull/1611.

- XGBoost on Spark: rstudio/sparkxgb.

25 / 32

Arrow on Spark

Arrow is a cross-language development platform for in-memory data.

26 / 32

Arrow on Spark

Arrow is a cross-language development platform for in-memory data.

devtools::install_github("apache/arrow", subdir = "r", ref = "dc5df8f")
devtools::install_github("rstudio/sparklyr")
library(arrow)
library(sparklyr)
27 / 32

Arrow on Spark

Arrow is a cross-language development platform for in-memory data.

devtools::install_github("apache/arrow", subdir = "r", ref = "dc5df8f")
devtools::install_github("rstudio/sparklyr")
library(arrow)
library(sparklyr)

28 / 32

XGBoost on Spark

sparkxgb is a sparklyr extension that provides an interface to XGBoost on Spark.

29 / 32

XGBoost on Spark

sparkxgb is a sparklyr extension that provides an interface to XGBoost on Spark.

devtools::install_github("rstudio/sparkxgb")
library(sparkxgb)
30 / 32

XGBoost on Spark

sparkxgb is a sparklyr extension that provides an interface to XGBoost on Spark.

devtools::install_github("rstudio/sparkxgb")
library(sparkxgb)
iris_tbl <- sdf_copy_to(sc, iris)
xgb_model <- xgboost_classifier(
iris_tbl,
Species ~ .,
objective = "multi:softprob",
num_class = 3,
num_round = 50,
max_depth = 4
)
xgb_model %>% ml_predict(iris_tbl) %>% glimpse()
31 / 32

Thank you!

Blog: blog.rstudio.com/tags/sparklyr

R Help: community.rstudio.com

Spark Help: stackoverflow.com/tags/sparklyr

Issues: github.com/rstudio/sparklyr/issues

Chat: gitter.im/rstudio.sparklyr

32 / 32

What to do when code is slow?

mtcars %>% lm(mpg ~ wt + cyl, .)
2 / 32
Paused

Help

Keyboard shortcuts

, , Pg Up, k Go to previous slide
, , Pg Dn, Space, j Go to next slide
Home Go to first slide
End Go to last slide
Number + Return Go to specific slide
b / m / f Toggle blackout / mirrored / fullscreen mode
c Clone slideshow
p Toggle presenter mode
t Restart the presentation timer
?, h Toggle this help
Esc Back to slideshow