Setup

Initialize all extensions for this webinar, at once and connect to Spark:

install.packages(spark.sas7bdat)
install.packages(rsparkling)
devtools::install_github("kevinykuo/sparklygraphs")
options(rsparkling.sparklingwater.version = "2.1.0")

library(spark.sas7bdat)
library(rsparkling)
library(sparklygraphs)

library(sparklyr)

sc <- spark_connect(master = "local", version = "2.1.0")

Reading SAS files with spark.sas7bdat

spark_read_sas(sc, "inst/data/datetime.sas7bdat", "sas_data")

Using H2O with rsparkling

library(dplyr)
library(h2o)

mtcars_tbl <- copy_to(sc, mtcars, "mtcars", overwrite = TRUE)
h2o_flow(sc, strict_version_check = FALSE)
mtcars_h2o <- as_h2o_frame(sc, mtcars_tbl, strict_version_check = FALSE)

mtcars_glm <- h2o.glm(x = c("wt", "cyl"), 
                      y = "mpg",
                      training_frame = mtcars_h2o,
                      lambda_search = TRUE)
## 
  |                                                                       
  |                                                                 |   0%
  |                                                                       
  |=================================================================| 100%
mtcars_glm
## Model Details:
## ==============
## 
## H2ORegressionModel: glm
## Model ID:  GLM_model_R_1502856002417_1 
## GLM Model: summary
##     family     link                              regularization
## 1 gaussian identity Elastic Net (alpha = 0.5, lambda = 0.1013 )
##                                                                lambda_search
## 1 nlambda = 100, lambda.max = 10.132, lambda.min = 0.1013, lambda.1se = -1.0
##   number_of_predictors_total number_of_active_predictors
## 1                          2                           2
##   number_of_iterations                                training_frame
## 1                    0 frame_rdd_36_a919eedd1cdba917bc77b163c73f46d1
## 
## Coefficients: glm coefficients
##       names coefficients standardized_coefficients
## 1 Intercept    38.941654                 20.090625
## 2       cyl    -1.468783                 -2.623132
## 3        wt    -3.034558                 -2.969186
## 
## H2ORegressionMetrics: glm
## ** Reported on training data. **
## 
## MSE:  6.017684
## RMSE:  2.453097
## MAE:  1.940985
## RMSLE:  0.1114801
## Mean Residual Deviance :  6.017684
## R^2 :  0.8289895
## Null Deviance :1126.047
## Null D.o.F. :31
## Residual Deviance :192.5659
## Residual D.o.F. :29
## AIC :156.2425

Using GraphX with sparklygraph

highschool_tbl <- copy_to(sc, ggraph::highschool, "highschool", overwrite = TRUE)
# create a table with unique vertices using dplyr
vertices_tbl <- sdf_bind_rows(
  highschool_tbl %>% distinct(from) %>% transmute(id = from),
  highschool_tbl %>% distinct(to) %>% transmute(id = to)
)
# create a table with <source, destination> edges
edges_tbl <- highschool_tbl %>% transmute(src = from, dst = to)
# run pagerank over graph
gf_graphframe(vertices_tbl, edges_tbl) %>%
  gf_pagerank(reset_prob = 0.15, max_iter = 10L, source_id = "1")
## Call: gf_pagerank(., reset_prob = 0.15, max_iter = 10L, source_id = "1")
## 
## Algo parameters:
##   Tolerance: 
##   Reset probability: 0.15
##   Max iterations: 10
##   Source ID: 1
## 
## Result:
## Vertices
## # Source:   table<sparklyr_tmp_4f515b1d2b47> [?? x 2]
## # Database: spark_connection
##       id    pagerank
##    <dbl>       <dbl>
##  1    12 0.012169139
##  2    12 0.012169139
##  3    59 0.001151867
##  4    59 0.001151867
##  5     1 0.155808486
##  6     1 0.155808486
##  7    20 0.035269712
##  8    20 0.035269712
##  9    45 0.023715824
## 10    45 0.023715824
## # ... with more rows
## 
## Edges
## # Source:   table<sparklyr_tmp_4f515804710c> [?? x 3]
## # Database: spark_connection
##      src   dst     weight
##    <dbl> <dbl>      <dbl>
##  1    13     6 0.02777778
##  2    13     6 0.02777778
##  3    13     6 0.02777778
##  4    13     6 0.02777778
##  5    13     6 0.02777778
##  6    13     6 0.02777778
##  7    13     6 0.02777778
##  8    13     6 0.02777778
##  9    13     6 0.02777778
## 10    13     6 0.02777778
## # ... with more rows

Extending Spark Data Sources

library(sparklyr)

config <- spark_config()
config$sparklyr.defaultPackages <- c(
  "datastax:spark-cassandra-connector:2.0.0-RC1-s_2.11"
)

sc <- spark_connect(master = "local", config = config)
spark_read_source(
  sc,
  "emp",
  "org.apache.spark.sql.cassandra",
  list(keyspace = "dev", table = "emp")
)

Basic Spark Streaming with invoke()

Basic use of Spark Streaming as described in Spark Streaming Programming Guide.

# val ssc = new StreamingContext(conf, Seconds(1))

ssc <- invoke_new(
  sc,
  "org.apache.spark.streaming.StreamingContext",
  spark_context(sc),
  invoke_new(
    sc,
    "org.apache.spark.streaming.Duration",
    10000L
  )
)
# val lines = ssc.socketTextStream("localhost", 9999)

lines <- invoke(
  ssc,
  "socketTextStream",
  "localhost",
  9999L,
  invoke_static(sc, "org.apache.spark.storage.StorageLevel", "MEMORY_AND_DISK")
)
# lines.count.print

invoke(lines, "count") %>% invoke("print")
# ssc.start()

invoke(ssc, "start")
nc -lk 9999
1
2
3
spark_disconnect(sc)