Initialize all extensions for this webinar, at once and connect to Spark:
install.packages(spark.sas7bdat)
install.packages(rsparkling)
devtools::install_github("kevinykuo/sparklygraphs")
options(rsparkling.sparklingwater.version = "2.1.0")
library(spark.sas7bdat)
library(rsparkling)
library(sparklygraphs)
library(sparklyr)
sc <- spark_connect(master = "local", version = "2.1.0")
spark.sas7bdatspark_read_sas(sc, "inst/data/datetime.sas7bdat", "sas_data")
rsparklinglibrary(dplyr)
library(h2o)
mtcars_tbl <- copy_to(sc, mtcars, "mtcars", overwrite = TRUE)
h2o_flow(sc, strict_version_check = FALSE)
mtcars_h2o <- as_h2o_frame(sc, mtcars_tbl, strict_version_check = FALSE)
mtcars_glm <- h2o.glm(x = c("wt", "cyl"),
y = "mpg",
training_frame = mtcars_h2o,
lambda_search = TRUE)
##
|
| | 0%
|
|=================================================================| 100%
mtcars_glm
## Model Details:
## ==============
##
## H2ORegressionModel: glm
## Model ID: GLM_model_R_1502856002417_1
## GLM Model: summary
## family link regularization
## 1 gaussian identity Elastic Net (alpha = 0.5, lambda = 0.1013 )
## lambda_search
## 1 nlambda = 100, lambda.max = 10.132, lambda.min = 0.1013, lambda.1se = -1.0
## number_of_predictors_total number_of_active_predictors
## 1 2 2
## number_of_iterations training_frame
## 1 0 frame_rdd_36_a919eedd1cdba917bc77b163c73f46d1
##
## Coefficients: glm coefficients
## names coefficients standardized_coefficients
## 1 Intercept 38.941654 20.090625
## 2 cyl -1.468783 -2.623132
## 3 wt -3.034558 -2.969186
##
## H2ORegressionMetrics: glm
## ** Reported on training data. **
##
## MSE: 6.017684
## RMSE: 2.453097
## MAE: 1.940985
## RMSLE: 0.1114801
## Mean Residual Deviance : 6.017684
## R^2 : 0.8289895
## Null Deviance :1126.047
## Null D.o.F. :31
## Residual Deviance :192.5659
## Residual D.o.F. :29
## AIC :156.2425
sparklygraphhighschool_tbl <- copy_to(sc, ggraph::highschool, "highschool", overwrite = TRUE)
# create a table with unique vertices using dplyr
vertices_tbl <- sdf_bind_rows(
highschool_tbl %>% distinct(from) %>% transmute(id = from),
highschool_tbl %>% distinct(to) %>% transmute(id = to)
)
# create a table with <source, destination> edges
edges_tbl <- highschool_tbl %>% transmute(src = from, dst = to)
# run pagerank over graph
gf_graphframe(vertices_tbl, edges_tbl) %>%
gf_pagerank(reset_prob = 0.15, max_iter = 10L, source_id = "1")
## Call: gf_pagerank(., reset_prob = 0.15, max_iter = 10L, source_id = "1")
##
## Algo parameters:
## Tolerance:
## Reset probability: 0.15
## Max iterations: 10
## Source ID: 1
##
## Result:
## Vertices
## # Source: table<sparklyr_tmp_4f515b1d2b47> [?? x 2]
## # Database: spark_connection
## id pagerank
## <dbl> <dbl>
## 1 12 0.012169139
## 2 12 0.012169139
## 3 59 0.001151867
## 4 59 0.001151867
## 5 1 0.155808486
## 6 1 0.155808486
## 7 20 0.035269712
## 8 20 0.035269712
## 9 45 0.023715824
## 10 45 0.023715824
## # ... with more rows
##
## Edges
## # Source: table<sparklyr_tmp_4f515804710c> [?? x 3]
## # Database: spark_connection
## src dst weight
## <dbl> <dbl> <dbl>
## 1 13 6 0.02777778
## 2 13 6 0.02777778
## 3 13 6 0.02777778
## 4 13 6 0.02777778
## 5 13 6 0.02777778
## 6 13 6 0.02777778
## 7 13 6 0.02777778
## 8 13 6 0.02777778
## 9 13 6 0.02777778
## 10 13 6 0.02777778
## # ... with more rows
library(sparklyr)
config <- spark_config()
config$sparklyr.defaultPackages <- c(
"datastax:spark-cassandra-connector:2.0.0-RC1-s_2.11"
)
sc <- spark_connect(master = "local", config = config)
spark_read_source(
sc,
"emp",
"org.apache.spark.sql.cassandra",
list(keyspace = "dev", table = "emp")
)
invoke()Basic use of Spark Streaming as described in Spark Streaming Programming Guide.
# val ssc = new StreamingContext(conf, Seconds(1))
ssc <- invoke_new(
sc,
"org.apache.spark.streaming.StreamingContext",
spark_context(sc),
invoke_new(
sc,
"org.apache.spark.streaming.Duration",
10000L
)
)
# val lines = ssc.socketTextStream("localhost", 9999)
lines <- invoke(
ssc,
"socketTextStream",
"localhost",
9999L,
invoke_static(sc, "org.apache.spark.storage.StorageLevel", "MEMORY_AND_DISK")
)
# lines.count.print
invoke(lines, "count") %>% invoke("print")
# ssc.start()
invoke(ssc, "start")
nc -lk 9999
1
2
3
spark_disconnect(sc)