Spark allows provides the opportunity to store data for machine learning task on spark cloud.This frees the memory requirements of the local computer allowing for fast computation and model training. More information on using spark in R with the help of the sparklyr package is available at rstudio spark website here.

pacman::p_load(sparklyr,tidyverse)
library(dplyr)
conf <- spark_config()
sc <- spark_connect(master = "local", version = "2.1.0")
spark_flights <- copy_to(sc, nycflights13::flights, "flights",overwrite = TRUE )

Install spark

#spark_install(version = "2.1.0")
# copy mtcars dataset into spark
mtcars_tbl <- copy_to(sc, mtcars, "mtcars", overwrite = TRUE)
# transform our data set, and then partition into 'training', 'test'
partitions <- mtcars_tbl %>%
  filter(hp >= 100) %>%
  mutate(cyl8 = cyl == 8) %>%
  sdf_partition(training = 0.5, test = 0.5, seed = 1099)
partitions %>%head()
$training

$test
NA

Using dplyr

iris_tbl <- copy_to(sc, iris,overwrite = TRUE)
flights_tbl <- copy_to(sc, nycflights13::flights, "flights",overwrite = TRUE)
batting_tbl <- copy_to(sc, Lahman::Batting, "batting",overwrite = TRUE)
src_tbls(sc)
[1] "batting" "flights" "iris"    "mtcars" 
# filter by departure delay and print the first few records
flights_tbl %>% filter(dep_delay == 2)
delay <- flights_tbl %>% 
  group_by(tailnum) %>%
  summarise(count = n(), dist = mean(distance), delay = mean(arr_delay)) %>%
  filter(count > 20, dist < 2000, !is.na(delay)) %>%
  collect
# plot delays
library(ggplot2)
ggplot(delay, aes(dist, delay)) +
  geom_point(aes(size = count), alpha = 1/2) +
  geom_smooth() +
  scale_size_area(max_size = 2)

WINDOW FUNCTIONS

dplyr supports Spark SQL window functions. Window functions are used in conjunction with mutate and filter to solve a wide range of problems. You can compare the dplyr syntax to the query it has generated by using dbplyr::sql_render().

head(batting_tbl)
batting_tbl %>%
  dplyr::select(playerID, yearID, teamID, G, AB:H) %>%
  arrange(playerID, yearID, teamID) %>%
  group_by(playerID) %>%
  filter(min_rank(desc(H)) <= 2 & H > 0)
# Find the most and least delayed flight each day
bestworst <- flights_tbl %>%
  group_by(year, month, day) %>%
  select(dep_delay) %>% 
  filter(dep_delay == min(dep_delay) || dep_delay == max(dep_delay))
dbplyr::sql_render(bestworst)
<SQL> SELECT `year`, `month`, `day`, `dep_delay`
FROM (SELECT `year`, `month`, `day`, `dep_delay`, min(`dep_delay`) OVER (PARTITION BY `year`, `month`, `day`) AS `zzz6`, max(`dep_delay`) OVER (PARTITION BY `year`, `month`, `day`) AS `zzz7`
FROM (SELECT `year` AS `year`, `month` AS `month`, `day` AS `day`, `dep_delay` AS `dep_delay`
FROM `flights`) `kdageljryn`) `dyqpqoosyx`
WHERE (`dep_delay` = `zzz6` OR `dep_delay` = `zzz7`)

Using SQL

It’s also possible to execute SQL queries directly against tables within a Spark cluster. The spark_connection object implements a DBI interface for Spark, so you can use dbGetQuery to execute SQL and return the result as an R data frame:

library(DBI)
iris_preview <- dbGetQuery(sc, "SELECT * FROM iris LIMIT 10")
iris_preview

Machine Learning

You can orchestrate machine learning algorithms in a Spark cluster via the machine learning functions within sparklyr. These functions connect to a set of high-level APIs built on top of DataFrames that help you create and tune machine learning workflows.

Here’s an example where we use ml_linear_regression to fit a linear regression model. We’ll use the built-in mtcars dataset, and see if we can predict a car’s fuel consumption (mpg) based on its weight (wt), and the number of cylinders the engine contains (cyl). We’ll assume in each case that the relationship between mpg and each of our features is linear.

# copy mtcars into spark
mtcars_tbl <- copy_to(sc, mtcars,overwrite = TRUE)
# transform our data set, and then partition into 'training', 'test'
partitions <- mtcars_tbl %>%
  filter(hp >= 100) %>%
  mutate(cyl8 = cyl == 8) %>%
  sdf_partition(training = 0.5, test = 0.5, seed = 1099)
# fit a linear model to the training dataset
fit <- partitions$training %>%
  ml_linear_regression(response = "mpg", features = c("wt", "cyl"))

For linear regression models produced by Spark, we can use summary() to learn a bit more about the quality of our fit, and the statistical significance of each of our predictors.

summary(fit)
Call: ml_linear_regression(., response = "mpg", features = c("wt", "cyl"))

Deviance Residuals::
   Min     1Q Median     3Q    Max 
-1.752 -1.134 -0.499  1.296  2.282 

Coefficients:
            Estimate Std. Error t value  Pr(>|t|)    
(Intercept) 33.49945    3.62256  9.2475 0.0002485 ***
wt          -2.81846    0.96619 -2.9171 0.0331257 *  
cyl         -0.92319    0.54639 -1.6896 0.1518998    
---
Signif. codes:  0 ‘***’ 0.001 ‘**’ 0.01 ‘*’ 0.05 ‘.’ 0.1 ‘ ’ 1

R-Squared: 0.8274
Root Mean Squared Error: 1.422
#Principal Component Analysis (PCA)


#pca <- food_world_cup %>%
#  mutate_each_(funs(as.numeric), countries) %>%
#  ml_pca(features = paste(colnames(food_world_cup)[-c(1:47)]))

Spark machine learning supports a wide array of algorithms and feature transformations and as illustrated above it’s easy to chain these functions together with dplyr pipelines. To learn more see the machine learning section.

Reading and Writing Data

You can read and write data in CSV, JSON, and Parquet formats. Data can be stored in HDFS, S3, or on the local filesystem of cluster nodes.

temp_csv <- tempfile(fileext = ".csv")
temp_parquet <- tempfile(fileext = ".parquet")
temp_json <- tempfile(fileext = ".json")
spark_write_csv(iris_tbl, temp_csv)
iris_csv_tbl <- spark_read_csv(sc, "iris_csv", temp_csv)
spark_write_parquet(iris_tbl, temp_parquet)
iris_parquet_tbl <- spark_read_parquet(sc, "iris_parquet", temp_parquet)
spark_write_json(iris_tbl, temp_json)
iris_json_tbl <- spark_read_json(sc, "iris_json", temp_json)
src_tbls(sc)
[1] "batting"      "flights"      "iris"         "iris_csv"     "iris_json"    "iris_parquet"
[7] "mtcars"      

Distributed R

You can execute arbitrary r code across your cluster using spark_apply. For example, we can apply rgamma over iris as follows:

spark_apply(iris_tbl, function(data) {
  data[1:4] + rgamma(1,2)
})

You can also group by columns to perform an operation over each group of rows and make use of any package within the closure:

spark_apply(
  iris_tbl,
  function(e) broom::tidy(lm(Petal_Width ~ Petal_Length, e)),
  names = c("term", "estimate", "std.error", "statistic", "p.value"),
  group_by = "Species"
)

Extensions

The facilities used internally by sparklyr for its dplyr and machine learning interfaces are available to extension packages. Since Spark is a general purpose cluster computing system there are many potential applications for extensions (e.g. interfaces to custom machine learning pipelines, interfaces to 3rd party Spark packages, etc.).

Here’s a simple example that wraps a Spark text file line counting function with an R function:

# write a CSV 
tempfile <- tempfile(fileext = ".csv")
write.csv(nycflights13::flights, tempfile, row.names = FALSE, na = "")
# define an R interface to Spark line counting
count_lines <- function(sc, path) {
  spark_context(sc) %>% 
    invoke("textFile", path, 1L) %>% 
      invoke("count")
}
# call spark to count the lines of the CSV
count_lines(sc, tempfile)
[1] 336777

To learn more about creating extensions see the Extensions section of the sparklyr website.

Table Utilities

You can cache a table into memory with:

tbl_cache(sc, "batting")

and unload from memory using:

tbl_uncache(sc, "batting")

Connection Utilities

You can view the Spark web console using the spark_web function:

spark_web(sc)

You can show the log using the spark_log function:

spark_log(sc, n = 10)
17/12/10 18:35:30 INFO Executor: Finished task 0.0 in stage 114.0 (TID 186). 2042 bytes result sent to driver
17/12/10 18:35:30 INFO TaskSetManager: Finished task 0.0 in stage 114.0 (TID 186) in 3 ms on localhost (executor driver) (1/1)
17/12/10 18:35:30 INFO TaskSchedulerImpl: Removed TaskSet 114.0, whose tasks have all completed, from pool 
17/12/10 18:35:30 INFO DAGScheduler: ResultStage 114 (collect at utils.scala:196) finished in 0.003 s
17/12/10 18:35:30 INFO DAGScheduler: Job 77 finished: collect at utils.scala:196, took 0.015368 s
17/12/10 18:35:30 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
17/12/10 18:35:30 INFO SparkSqlParser: Parsing command: UNCACHE TABLE `batting`
17/12/10 18:35:30 INFO SparkSqlParser: Parsing command: `batting`
17/12/10 18:35:30 INFO MapPartitionsRDD: Removing RDD 228 from persistence list
17/12/10 18:35:30 INFO BlockManager: Removing RDD 228

Finally, we disconnect from Spark:

spark_disconnect(sc)

Using H2O

rsparkling is a CRAN package from H2O that extends sparklyr to provide an interface into Sparkling Water. For instance, the following example installs, configures and runs h2o.glm:

# The following two commands remove any previously installed H2O packages for R.
 if ("package:h2o" %in% search()) { detach("package:h2o", unload=TRUE) }
 if ("h2o" %in% rownames(installed.packages())) { remove.packages("h2o") }
# 
# # Next, we download packages that H2O depends on.
# pkgs <- c("methods","statmod","stats","graphics","RCurl","jsonlite","tools","utils")
# for (pkg in pkgs) {
#     if (! (pkg %in% rownames(installed.packages()))) { install.packages(pkg) }
# }
# 
# # Now we download, install, and initialize the H2O package for R. 
# # In this case we are using rel-tverberg 2 (3.10.3.2).
# install.packages("h2o", type = "source", repos = "http://h2o-release.s3.amazonaws.com/h2o/rel-tverberg/2/R")
# 
# #Install rsparkling
# 
# #The latest stable version of rsparkling on CRAN can be installed as follows:
# 
# install.packages("rsparkling")
# 
# #Alternatively, the development version can be installed from the "master" branch as follows:
# 
# library(devtools)
# #devtools::install_github("h2oai/rsparkling", ref = "master")
# 
# options(rsparkling.sparklingwater.version = "2.0.1") # Using Sparkling Water 2.0.1
# library(rsparkling) 
options(rsparkling.sparklingwater.version = "2.1.18")
pacman::p_load(rsparkling)
library(sparklyr)
library(dplyr)
# The following two commands remove any previously installed H2O packages for R.
#if ("package:h2o" %in% search()) { detach("package:h2o", unload=TRUE) }
#if ("h2o" %in% rownames(installed.packages())) { remove.packages("h2o") }
# Next, we download, install and initialize the H2O package for R.
#install.packages("h2o", repos=(c("http://s3.amazonaws.com/h2o-release/h2o/master/1497/R",getOption("repos"))))
#install.packages("h2o")
library(h2o)
localH2O = h2o.init()
 Connection successful!

R is connected to the H2O cluster: 
    H2O cluster uptime:         4 minutes 6 seconds 
    H2O cluster version:        3.14.0.3 
    H2O cluster version age:    2 months and 18 days  
    H2O cluster name:           H2O_started_from_R_nanaakwasiabayieboateng_bfe217 
    H2O cluster total nodes:    1 
    H2O cluster total memory:   1.78 GB 
    H2O cluster total cores:    8 
    H2O cluster allowed cores:  8 
    H2O cluster healthy:        TRUE 
    H2O Connection ip:          localhost 
    H2O Connection port:        54321 
    H2O Connection proxy:       NA 
    H2O Internal Security:      FALSE 
    H2O API Extensions:         XGBoost, Algos, AutoML, Core V3, Core V4 
    R Version:                  R version 3.4.1 (2017-06-30) 
sc <- spark_connect(master = "local", version = "2.1.0")
mtcars_tbl <- copy_to(sc, mtcars, "mtcars",overwrite = TRUE)
#Convert a Spark DataFrame to an H2O Frame
mtcars_h2o <- as_h2o_frame(sc, mtcars_tbl, strict_version_check = FALSE) 
mtcars_glm <- h2o.glm(x = c("wt", "cyl"), 
                      y = "mpg",
                      training_frame = mtcars_h2o,
                      lambda_search = TRUE)

  |                                                                                                    
  |                                                                                              |   0%
  |                                                                                                    
  |==============================================================================================| 100%
mtcars_glm
Model Details:
==============

H2ORegressionModel: glm
Model ID:  GLM_model_R_1512957020849_2 
GLM Model: summary
    family     link                              regularization
1 gaussian identity Elastic Net (alpha = 0.5, lambda = 0.1013 )
                                                               lambda_search number_of_predictors_total
1 nlambda = 100, lambda.max = 10.132, lambda.min = 0.1013, lambda.1se = -1.0                          2
  number_of_active_predictors number_of_iterations                                training_frame
1                           2                  100 frame_rdd_66_90ad305c976d65edb6d3765b27b94fe5

Coefficients: glm coefficients
      names coefficients standardized_coefficients
1 Intercept    38.941654                 20.090625
2       cyl    -1.468783                 -2.623132
3        wt    -3.034558                 -2.969186

H2ORegressionMetrics: glm
** Reported on training data. **

MSE:  6.017684
RMSE:  2.453097
MAE:  1.940985
RMSLE:  0.1114801
Mean Residual Deviance :  6.017684
R^2 :  0.8289895
Null Deviance :1126.047
Null D.o.F. :31
Residual Deviance :192.5659
Residual D.o.F. :29
AIC :156.2425
spark_disconnect(sc)

Connecting through Livy

Livy enables remote connections to Apache Spark clusters. Connecting to Spark clusters through Livy is under experimental development in sparklyr.

Before connecting to Livy, you will need the connection information to an existing service running Livy. Otherwise, to test livy in your local environment, you can install it and run it locally as follows:

livy_install()
Y
trying URL 'https://d3kbcqa49mib13.cloudfront.net/spark-2.0.1-bin-hadoop2.7.tgz'
Content type 'application/x-tar' length 187275308 bytes (178.6 MB)
==================================================
downloaded 178.6 MB

trying URL 'http://archive.cloudera.com/beta/livy/livy-server-0.3.0.zip'
Content type 'application/zip' length 95253743 bytes (90.8 MB)
================================
livy_service_start()

To connect, use the Livy service address as master and method = “livy” in spark_connect. Once connection completes, use sparklyr as usual, for instance:

sc <- spark_connect(master = "http://localhost:8998", method = "livy")
17/12/10 21:59:41 INFO InteractiveSession$: Creating LivyClient for sessionId: 0
17/12/10 21:59:41 WARN RSCConf: Your hostname, nanas-mbp, resolves to a loopback address, but we couldn't find any external IP address!
17/12/10 21:59:41 WARN RSCConf: Set livy.rsc.rpc.server.address if you need to bind to another address.
17/12/10 21:59:42 INFO InteractiveSessionManager: Registering new session 0
17/12/10 21:59:43 INFO ContextLauncher: 17/12/10 21:59:43 INFO RSCDriver: Connecting to: nanas-mbp:56796
17/12/10 21:59:43 INFO ContextLauncher: 17/12/10 21:59:43 INFO RSCDriver: Starting RPC server...
17/12/10 21:59:43 INFO ContextLauncher: 17/12/10 21:59:43 WARN RSCConf: Your hostname, nanas-mbp, resolves to a loopback address, but we couldn't find any external IP address!
17/12/10 21:59:43 INFO ContextLauncher: 17/12/10 21:59:43 WARN RSCConf: Set livy.rsc.rpc.server.address if you need to bind to another address.
17/12/10 21:59:43 INFO ContextLauncher: 17/12/10 21:59:43 INFO RSCDriver: Received job request a4598067-2aeb-489d-96fb-8b1fbab6decf
17/12/10 21:59:43 INFO ContextLauncher: 17/12/10 21:59:43 INFO RSCDriver: SparkContext not yet up, queueing job request.
17/12/10 21:59:46 INFO ContextLauncher: 17/12/10 21:59:46 INFO SparkContext: Running Spark version 2.1.0
17/12/10 21:59:46 INFO ContextLauncher: 17/12/10 21:59:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/12/10 21:59:46 INFO ContextLauncher: 17/12/10 21:59:46 INFO SecurityManager: Changing view acls to: nanaakwasiabayieboateng
17/12/10 21:59:46 INFO ContextLauncher: 17/12/10 21:59:46 INFO SecurityManager: Changing modify acls to: nanaakwasiabayieboateng
17/12/10 21:59:46 INFO ContextLauncher: 17/12/10 21:59:46 INFO SecurityManager: Changing view acls groups to: 
17/12/10 21:59:46 INFO ContextLauncher: 17/12/10 21:59:46 INFO SecurityManager: Changing modify acls groups to: 
17/12/10 21:59:46 INFO ContextLauncher: 17/12/10 21:59:46 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(nanaakwasiabayieboateng); groups with view permissions: Set(); users  with modify permissions: Set(nanaakwasiabayieboateng); groups with modify permissions: Set()
17/12/10 21:59:46 INFO ContextLauncher: 17/12/10 21:59:46 INFO Utils: Successfully started service 'sparkDriver' on port 56808.
17/12/10 21:59:46 INFO ContextLauncher: 17/12/10 21:59:46 INFO SparkEnv: Registering MapOutputTracker
17/12/10 21:59:46 INFO ContextLauncher: 17/12/10 21:59:46 INFO SparkEnv: Registering BlockManagerMaster
17/12/10 21:59:46 INFO ContextLauncher: 17/12/10 21:59:46 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
17/12/10 21:59:46 INFO ContextLauncher: 17/12/10 21:59:46 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
17/12/10 21:59:46 INFO ContextLauncher: 17/12/10 21:59:46 INFO DiskBlockManager: Created local directory at /private/var/folders/mj/w1gxzjcd0qx2cw_0690z7y640000gn/T/blockmgr-df008615-44e7-4a5f-b4fd-d78a474d7b87
17/12/10 21:59:47 INFO ContextLauncher: 17/12/10 21:59:47 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
17/12/10 21:59:47 INFO ContextLauncher: 17/12/10 21:59:47 INFO SparkEnv: Registering OutputCommitCoordinator
17/12/10 21:59:47 INFO ContextLauncher: 17/12/10 21:59:47 INFO Utils: Successfully started service 'SparkUI' on port 4040.
17/12/10 21:59:47 INFO ContextLauncher: 17/12/10 21:59:47 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.1.66:4040
17/12/10 21:59:47 INFO ContextLauncher: 17/12/10 21:59:47 INFO SparkContext: Added JAR file:/Users/nanaakwasiabayieboateng/Library/Caches/livy/livy-0.3.0/rsc-jars/livy-api-0.3.0.jar at spark://192.168.1.66:56808/jars/livy-api-0.3.0.jar with timestamp 1512961187280
17/12/10 21:59:47 INFO ContextLauncher: 17/12/10 21:59:47 INFO SparkContext: Added JAR file:/Users/nanaakwasiabayieboateng/Library/Caches/livy/livy-0.3.0/rsc-jars/livy-rsc-0.3.0.jar at spark://192.168.1.66:56808/jars/livy-rsc-0.3.0.jar with timestamp 1512961187280
17/12/10 21:59:47 INFO ContextLauncher: 17/12/10 21:59:47 INFO SparkContext: Added JAR file:/Users/nanaakwasiabayieboateng/Library/Caches/livy/livy-0.3.0/rsc-jars/netty-all-4.0.29.Final.jar at spark://192.168.1.66:56808/jars/netty-all-4.0.29.Final.jar with timestamp 1512961187280
17/12/10 21:59:47 INFO ContextLauncher: 17/12/10 21:59:47 INFO SparkContext: Added JAR file:/Users/nanaakwasiabayieboateng/Library/Caches/livy/livy-0.3.0/repl_2.11-jars/commons-codec-1.9.jar at spark://192.168.1.66:56808/jars/commons-codec-1.9.jar with timestamp 1512961187281
17/12/10 21:59:47 INFO ContextLauncher: 17/12/10 21:59:47 INFO SparkContext: Added JAR file:/Users/nanaakwasiabayieboateng/Library/Caches/livy/livy-0.3.0/repl_2.11-jars/livy-core_2.11-0.3.0.jar at spark://192.168.1.66:56808/jars/livy-core_2.11-0.3.0.jar with timestamp 1512961187281
17/12/10 21:59:47 INFO ContextLauncher: 17/12/10 21:59:47 INFO SparkContext: Added JAR file:/Users/nanaakwasiabayieboateng/Library/Caches/livy/livy-0.3.0/repl_2.11-jars/livy-repl_2.11-0.3.0.jar at spark://192.168.1.66:56808/jars/livy-repl_2.11-0.3.0.jar with timestamp 1512961187281
17/12/10 21:59:47 INFO ContextLauncher: 17/12/10 21:59:47 INFO Executor: Starting executor ID driver on host localhost
17/12/10 21:59:47 INFO ContextLauncher: 17/12/10 21:59:47 INFO Executor: Using REPL class URI: spark://192.168.1.66:56808/classes
17/12/10 21:59:47 INFO ContextLauncher: 17/12/10 21:59:47 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 56819.
17/12/10 21:59:47 INFO ContextLauncher: 17/12/10 21:59:47 INFO NettyBlockTransferService: Server created on 192.168.1.66:56819
17/12/10 21:59:47 INFO ContextLauncher: 17/12/10 21:59:47 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
17/12/10 21:59:47 INFO ContextLauncher: 17/12/10 21:59:47 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.1.66, 56819, None)
17/12/10 21:59:47 INFO ContextLauncher: 17/12/10 21:59:47 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.1.66:56819 with 366.3 MB RAM, BlockManagerId(driver, 192.168.1.66, 56819, None)
17/12/10 21:59:47 INFO ContextLauncher: 17/12/10 21:59:47 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.1.66, 56819, None)
17/12/10 21:59:47 INFO ContextLauncher: 17/12/10 21:59:47 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.1.66, 56819, None)
17/12/10 21:59:47 INFO ContextLauncher: 17/12/10 21:59:47 INFO SharedState: Warehouse path is 'file:/Users/nanaakwasiabayieboateng/Documents/memphisclassesbooks/DataMiningscience/Spark/spark-warehouse'.
17/12/10 21:59:47 INFO ContextLauncher: 17/12/10 21:59:47 INFO SparkInterpreter: Created Spark session.
17/12/10 21:59:50 INFO RSCClient: Received result for a4598067-2aeb-489d-96fb-8b1fbab6decf
17/12/10 22:00:35 INFO ContextLauncher: 17/12/10 22:00:35 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
copy_to(sc, iris)
17/12/10 22:00:37 INFO ContextLauncher: 17/12/10 22:00:37 INFO SparkSqlParser: Parsing command: SHOW TABLES
17/12/10 22:00:48 INFO ContextLauncher: 17/12/10 22:00:48 INFO CodeGenerator: Code generated in 201.739651 ms
17/12/10 22:00:48 INFO ContextLauncher: 17/12/10 22:00:48 INFO SparkContext: Starting job: collect at <console>:86
17/12/10 22:00:48 INFO ContextLauncher: 17/12/10 22:00:48 INFO DAGScheduler: Got job 0 (collect at <console>:86) with 1 output partitions
17/12/10 22:00:48 INFO ContextLauncher: 17/12/10 22:00:48 INFO DAGScheduler: Final stage: ResultStage 0 (collect at <console>:86)
17/12/10 22:00:48 INFO ContextLauncher: 17/12/10 22:00:48 INFO DAGScheduler: Parents of final stage: List()
17/12/10 22:00:48 INFO ContextLauncher: 17/12/10 22:00:48 INFO DAGScheduler: Missing parents: List()
17/12/10 22:00:48 INFO ContextLauncher: 17/12/10 22:00:48 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[5] at map at <console>:83), which has no missing parents
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 8.8 KB, free 366.3 MB)
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 4.6 KB, free 366.3 MB)
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.66:56819 (size: 4.6 KB, free: 366.3 MB)
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:996
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[5] at map at <console>:83)
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 6480 bytes)
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO Executor: Fetching spark://192.168.1.66:56808/jars/commons-codec-1.9.jar with timestamp 1512961187281
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO TransportClientFactory: Successfully created connection to /192.168.1.66:56808 after 7 ms (0 ms spent in bootstraps)
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO Utils: Fetching spark://192.168.1.66:56808/jars/commons-codec-1.9.jar to /private/var/folders/mj/w1gxzjcd0qx2cw_0690z7y640000gn/T/spark-55d38c30-c637-4e4c-ad8c-d0fb1165fa87/userFiles-53dec700-b0e0-4892-b51c-11ae987ec710/fetchFileTemp6963298644517339299.tmp
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO Executor: Adding file:/private/var/folders/mj/w1gxzjcd0qx2cw_0690z7y640000gn/T/spark-55d38c30-c637-4e4c-ad8c-d0fb1165fa87/userFiles-53dec700-b0e0-4892-b51c-11ae987ec710/commons-codec-1.9.jar to class loader
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO Executor: Fetching spark://192.168.1.66:56808/jars/livy-api-0.3.0.jar with timestamp 1512961187280
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO Utils: Fetching spark://192.168.1.66:56808/jars/livy-api-0.3.0.jar to /private/var/folders/mj/w1gxzjcd0qx2cw_0690z7y640000gn/T/spark-55d38c30-c637-4e4c-ad8c-d0fb1165fa87/userFiles-53dec700-b0e0-4892-b51c-11ae987ec710/fetchFileTemp4186952510546942290.tmp
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO Executor: Adding file:/private/var/folders/mj/w1gxzjcd0qx2cw_0690z7y640000gn/T/spark-55d38c30-c637-4e4c-ad8c-d0fb1165fa87/userFiles-53dec700-b0e0-4892-b51c-11ae987ec710/livy-api-0.3.0.jar to class loader
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO Executor: Fetching spark://192.168.1.66:56808/jars/netty-all-4.0.29.Final.jar with timestamp 1512961187280
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO Utils: Fetching spark://192.168.1.66:56808/jars/netty-all-4.0.29.Final.jar to /private/var/folders/mj/w1gxzjcd0qx2cw_0690z7y640000gn/T/spark-55d38c30-c637-4e4c-ad8c-d0fb1165fa87/userFiles-53dec700-b0e0-4892-b51c-11ae987ec710/fetchFileTemp3224897440026333828.tmp
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO Executor: Adding file:/private/var/folders/mj/w1gxzjcd0qx2cw_0690z7y640000gn/T/spark-55d38c30-c637-4e4c-ad8c-d0fb1165fa87/userFiles-53dec700-b0e0-4892-b51c-11ae987ec710/netty-all-4.0.29.Final.jar to class loader
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO Executor: Fetching spark://192.168.1.66:56808/jars/livy-rsc-0.3.0.jar with timestamp 1512961187280
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO Utils: Fetching spark://192.168.1.66:56808/jars/livy-rsc-0.3.0.jar to /private/var/folders/mj/w1gxzjcd0qx2cw_0690z7y640000gn/T/spark-55d38c30-c637-4e4c-ad8c-d0fb1165fa87/userFiles-53dec700-b0e0-4892-b51c-11ae987ec710/fetchFileTemp3492883315166192779.tmp
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO Executor: Adding file:/private/var/folders/mj/w1gxzjcd0qx2cw_0690z7y640000gn/T/spark-55d38c30-c637-4e4c-ad8c-d0fb1165fa87/userFiles-53dec700-b0e0-4892-b51c-11ae987ec710/livy-rsc-0.3.0.jar to class loader
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO Executor: Fetching spark://192.168.1.66:56808/jars/livy-repl_2.11-0.3.0.jar with timestamp 1512961187281
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO Utils: Fetching spark://192.168.1.66:56808/jars/livy-repl_2.11-0.3.0.jar to /private/var/folders/mj/w1gxzjcd0qx2cw_0690z7y640000gn/T/spark-55d38c30-c637-4e4c-ad8c-d0fb1165fa87/userFiles-53dec700-b0e0-4892-b51c-11ae987ec710/fetchFileTemp8634176168647000328.tmp
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO Executor: Adding file:/private/var/folders/mj/w1gxzjcd0qx2cw_0690z7y640000gn/T/spark-55d38c30-c637-4e4c-ad8c-d0fb1165fa87/userFiles-53dec700-b0e0-4892-b51c-11ae987ec710/livy-repl_2.11-0.3.0.jar to class loader
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO Executor: Fetching spark://192.168.1.66:56808/jars/livy-core_2.11-0.3.0.jar with timestamp 1512961187281
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO Utils: Fetching spark://192.168.1.66:56808/jars/livy-core_2.11-0.3.0.jar to /private/var/folders/mj/w1gxzjcd0qx2cw_0690z7y640000gn/T/spark-55d38c30-c637-4e4c-ad8c-d0fb1165fa87/userFiles-53dec700-b0e0-4892-b51c-11ae987ec710/fetchFileTemp6833100158513643696.tmp
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO Executor: Adding file:/private/var/folders/mj/w1gxzjcd0qx2cw_0690z7y640000gn/T/spark-55d38c30-c637-4e4c-ad8c-d0fb1165fa87/userFiles-53dec700-b0e0-4892-b51c-11ae987ec710/livy-core_2.11-0.3.0.jar to class loader
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO CodeGenerator: Code generated in 17.173307 ms
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO CodeGenerator: Code generated in 15.348259 ms
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1231 bytes result sent to driver
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 346 ms on localhost (executor driver) (1/1)
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO DAGScheduler: ResultStage 0 (collect at <console>:86) finished in 0.362 s
17/12/10 22:00:49 INFO ContextLauncher: 17/12/10 22:00:49 INFO DAGScheduler: Job 0 finished: collect at <console>:86, took 0.548171 s
17/12/10 22:00:58 INFO ContextLauncher: 17/12/10 22:00:58 INFO SparkSqlParser: Parsing command: iris
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO SparkSqlParser: Parsing command: CACHE TABLE `iris`
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO SparkSqlParser: Parsing command: `iris`
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO CodeGenerator: Code generated in 15.074389 ms
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO CodeGenerator: Code generated in 12.877807 ms
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO SparkContext: Starting job: sql at NativeMethodAccessorImpl.java:0
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO DAGScheduler: Registering RDD 13 (sql at NativeMethodAccessorImpl.java:0)
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO DAGScheduler: Got job 1 (sql at NativeMethodAccessorImpl.java:0) with 1 output partitions
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO DAGScheduler: Final stage: ResultStage 2 (sql at NativeMethodAccessorImpl.java:0)
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 1)
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 1)
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO DAGScheduler: Submitting ShuffleMapStage 1 (MapPartitionsRDD[13] at sql at NativeMethodAccessorImpl.java:0), which has no missing parents
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 18.1 KB, free 366.3 MB)
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 8.3 KB, free 366.3 MB)
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.1.66:56819 (size: 8.3 KB, free: 366.3 MB)
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:996
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 1 (MapPartitionsRDD[13] at sql at NativeMethodAccessorImpl.java:0)
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, executor driver, partition 0, PROCESS_LOCAL, 19136 bytes)
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO CodeGenerator: Code generated in 16.318283 ms
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO CodeGenerator: Code generated in 59.065498 ms
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO MemoryStore: Block rdd_10_0 stored as values in memory (estimated size 5.6 KB, free 366.3 MB)
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO BlockManagerInfo: Added rdd_10_0 in memory on 192.168.1.66:56819 (size: 5.6 KB, free: 366.3 MB)
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO CodeGenerator: Code generated in 9.452505 ms
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO CodeGenerator: Code generated in 24.714626 ms
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 2750 bytes result sent to driver
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 255 ms on localhost (executor driver) (1/1)
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO DAGScheduler: ShuffleMapStage 1 (sql at NativeMethodAccessorImpl.java:0) finished in 0.256 s
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO DAGScheduler: looking for newly runnable stages
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO DAGScheduler: running: Set()
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO DAGScheduler: waiting: Set(ResultStage 2)
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO DAGScheduler: failed: Set()
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[16] at sql at NativeMethodAccessorImpl.java:0), which has no missing parents
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 7.0 KB, free 366.2 MB)
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 3.7 KB, free 366.2 MB)
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.1.66:56819 (size: 3.7 KB, free: 366.3 MB)
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:996
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[16] at sql at NativeMethodAccessorImpl.java:0)
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, executor driver, partition 0, ANY, 6392 bytes)
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 5 ms
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 2042 bytes result sent to driver
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 34 ms on localhost (executor driver) (1/1)
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO DAGScheduler: ResultStage 2 (sql at NativeMethodAccessorImpl.java:0) finished in 0.034 s
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO DAGScheduler: Job 1 finished: sql at NativeMethodAccessorImpl.java:0, took 0.332620 s
17/12/10 22:00:59 INFO ContextLauncher: 17/12/10 22:00:59 INFO CodeGenerator: Code generated in 9.419903 ms
17/12/10 22:01:01 INFO ContextLauncher: 17/12/10 22:01:01 INFO SparkSqlParser: Parsing command: SELECT count(*) FROM  `iris`
17/12/10 22:01:01 INFO ContextLauncher: 17/12/10 22:01:01 INFO ContextCleaner: Cleaned accumulator 64
17/12/10 22:01:01 INFO ContextLauncher: 17/12/10 22:01:01 INFO ContextCleaner: Cleaned accumulator 52
17/12/10 22:01:01 INFO ContextLauncher: 17/12/10 22:01:01 INFO BlockManagerInfo: Removed broadcast_2_piece0 on 192.168.1.66:56819 in memory (size: 3.7 KB, free: 366.3 MB)
17/12/10 22:01:01 INFO ContextLauncher: 17/12/10 22:01:01 INFO ContextCleaner: Cleaned accumulator 53
17/12/10 22:01:01 INFO ContextLauncher: 17/12/10 22:01:01 INFO ContextCleaner: Cleaned accumulator 54
17/12/10 22:01:01 INFO ContextLauncher: 17/12/10 22:01:01 INFO ContextCleaner: Cleaned accumulator 55
17/12/10 22:01:01 INFO ContextLauncher: 17/12/10 22:01:01 INFO ContextCleaner: Cleaned accumulator 56
17/12/10 22:01:01 INFO ContextLauncher: 17/12/10 22:01:01 INFO ContextCleaner: Cleaned accumulator 57
17/12/10 22:01:01 INFO ContextLauncher: 17/12/10 22:01:01 INFO ContextCleaner: Cleaned accumulator 58
17/12/10 22:01:01 INFO ContextLauncher: 17/12/10 22:01:01 INFO ContextCleaner: Cleaned accumulator 59
17/12/10 22:01:01 INFO ContextLauncher: 17/12/10 22:01:01 INFO ContextCleaner: Cleaned accumulator 60
17/12/10 22:01:01 INFO ContextLauncher: 17/12/10 22:01:01 INFO ContextCleaner: Cleaned accumulator 61
17/12/10 22:01:01 INFO ContextLauncher: 17/12/10 22:01:01 INFO ContextCleaner: Cleaned accumulator 62
17/12/10 22:01:01 INFO ContextLauncher: 17/12/10 22:01:01 INFO ContextCleaner: Cleaned accumulator 63
17/12/10 22:01:01 INFO ContextLauncher: 17/12/10 22:01:01 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.1.66:56819 in memory (size: 8.3 KB, free: 366.3 MB)
17/12/10 22:01:01 INFO ContextLauncher: 17/12/10 22:01:01 INFO ContextCleaner: Cleaned shuffle 0
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO SparkContext: Starting job: collect at <console>:224
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO DAGScheduler: Registering RDD 20 (collect at <console>:224)
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO DAGScheduler: Got job 2 (collect at <console>:224) with 1 output partitions
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO DAGScheduler: Final stage: ResultStage 4 (collect at <console>:224)
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 3)
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 3)
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO DAGScheduler: Submitting ShuffleMapStage 3 (MapPartitionsRDD[20] at collect at <console>:224), which has no missing parents
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 18.1 KB, free 366.3 MB)
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 8.3 KB, free 366.3 MB)
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.1.66:56819 (size: 8.3 KB, free: 366.3 MB)
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:996
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 3 (MapPartitionsRDD[20] at collect at <console>:224)
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3, localhost, executor driver, partition 0, PROCESS_LOCAL, 19128 bytes)
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO Executor: Running task 0.0 in stage 3.0 (TID 3)
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO BlockManager: Found block rdd_10_0 locally
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO Executor: Finished task 0.0 in stage 3.0 (TID 3). 2101 bytes result sent to driver
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 3) in 17 ms on localhost (executor driver) (1/1)
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO DAGScheduler: ShuffleMapStage 3 (collect at <console>:224) finished in 0.017 s
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO DAGScheduler: looking for newly runnable stages
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO DAGScheduler: running: Set()
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO DAGScheduler: waiting: Set(ResultStage 4)
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO DAGScheduler: failed: Set()
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO DAGScheduler: Submitting ResultStage 4 (MapPartitionsRDD[23] at collect at <console>:224), which has no missing parents
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 7.0 KB, free 366.2 MB)
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 3.7 KB, free 366.2 MB)
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 192.168.1.66:56819 (size: 3.7 KB, free: 366.3 MB)
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:996
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 4 (MapPartitionsRDD[23] at collect at <console>:224)
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO TaskSchedulerImpl: Adding task set 4.0 with 1 tasks
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 4, localhost, executor driver, partition 0, ANY, 6384 bytes)
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO Executor: Running task 0.0 in stage 4.0 (TID 4)
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO Executor: Finished task 0.0 in stage 4.0 (TID 4). 2042 bytes result sent to driver
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 4) in 6 ms on localhost (executor driver) (1/1)
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool 
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO DAGScheduler: ResultStage 4 (collect at <console>:224) finished in 0.006 s
17/12/10 22:01:02 INFO ContextLauncher: 17/12/10 22:01:02 INFO DAGScheduler: Job 2 finished: collect at <console>:224, took 0.042017 s
17/12/10 22:01:03 INFO ContextLauncher: 17/12/10 22:01:03 INFO BlockManagerInfo: Removed broadcast_4_piece0 on 192.168.1.66:56819 in memory (size: 3.7 KB, free: 366.3 MB)
17/12/10 22:01:03 INFO ContextLauncher: 17/12/10 22:01:03 INFO ContextCleaner: Cleaned accumulator 161
17/12/10 22:01:03 INFO ContextLauncher: 17/12/10 22:01:03 INFO BlockManagerInfo: Removed broadcast_3_piece0 on 192.168.1.66:56819 in memory (size: 8.3 KB, free: 366.3 MB)
17/12/10 22:01:03 INFO ContextLauncher: 17/12/10 22:01:03 INFO SparkSqlParser: Parsing command: SELECT *
17/12/10 22:01:03 INFO ContextLauncher: FROM `iris` AS `zzz8`
17/12/10 22:01:03 INFO ContextLauncher: WHERE (0 = 1)
17/12/10 22:01:05 INFO ContextLauncher: 17/12/10 22:01:05 INFO SparkSqlParser: Parsing command: SELECT *
17/12/10 22:01:05 INFO ContextLauncher: FROM `iris`
17/12/10 22:01:05 INFO ContextLauncher: LIMIT 1000
17/12/10 22:01:06 INFO ContextLauncher: 17/12/10 22:01:06 INFO SparkContext: Starting job: collect at <console>:224
17/12/10 22:01:06 INFO ContextLauncher: 17/12/10 22:01:06 INFO DAGScheduler: Got job 3 (collect at <console>:224) with 1 output partitions
17/12/10 22:01:06 INFO ContextLauncher: 17/12/10 22:01:06 INFO DAGScheduler: Final stage: ResultStage 5 (collect at <console>:224)
17/12/10 22:01:06 INFO ContextLauncher: 17/12/10 22:01:06 INFO DAGScheduler: Parents of final stage: List()
17/12/10 22:01:06 INFO ContextLauncher: 17/12/10 22:01:06 INFO DAGScheduler: Missing parents: List()
17/12/10 22:01:06 INFO ContextLauncher: 17/12/10 22:01:06 INFO DAGScheduler: Submitting ResultStage 5 (MapPartitionsRDD[25] at collect at <console>:224), which has no missing parents
17/12/10 22:01:06 INFO ContextLauncher: 17/12/10 22:01:06 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 13.7 KB, free 366.3 MB)
17/12/10 22:01:06 INFO ContextLauncher: 17/12/10 22:01:06 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 6.4 KB, free 366.3 MB)
17/12/10 22:01:06 INFO ContextLauncher: 17/12/10 22:01:06 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on 192.168.1.66:56819 (size: 6.4 KB, free: 366.3 MB)
17/12/10 22:01:06 INFO ContextLauncher: 17/12/10 22:01:06 INFO SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:996
17/12/10 22:01:06 INFO ContextLauncher: 17/12/10 22:01:06 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 5 (MapPartitionsRDD[25] at collect at <console>:224)
17/12/10 22:01:06 INFO ContextLauncher: 17/12/10 22:01:06 INFO TaskSchedulerImpl: Adding task set 5.0 with 1 tasks
17/12/10 22:01:06 INFO ContextLauncher: 17/12/10 22:01:06 INFO TaskSetManager: Starting task 0.0 in stage 5.0 (TID 5, localhost, executor driver, partition 0, PROCESS_LOCAL, 19054 bytes)
17/12/10 22:01:06 INFO ContextLauncher: 17/12/10 22:01:06 INFO Executor: Running task 0.0 in stage 5.0 (TID 5)
17/12/10 22:01:06 INFO ContextLauncher: 17/12/10 22:01:06 INFO BlockManager: Found block rdd_10_0 locally
17/12/10 22:01:06 INFO ContextLauncher: 17/12/10 22:01:06 INFO CodeGenerator: Code generated in 28.233432 ms
17/12/10 22:01:07 INFO ContextLauncher: 17/12/10 22:01:07 INFO Executor: Finished task 0.0 in stage 5.0 (TID 5). 3726 bytes result sent to driver
17/12/10 22:01:07 INFO ContextLauncher: 17/12/10 22:01:07 INFO TaskSetManager: Finished task 0.0 in stage 5.0 (TID 5) in 49 ms on localhost (executor driver) (1/1)
17/12/10 22:01:07 INFO ContextLauncher: 17/12/10 22:01:07 INFO TaskSchedulerImpl: Removed TaskSet 5.0, whose tasks have all completed, from pool 
17/12/10 22:01:07 INFO ContextLauncher: 17/12/10 22:01:07 INFO DAGScheduler: ResultStage 5 (collect at <console>:224) finished in 0.049 s
17/12/10 22:01:07 INFO ContextLauncher: 17/12/10 22:01:07 INFO DAGScheduler: Job 3 finished: collect at <console>:224, took 0.057789 s
17/12/10 22:01:07 INFO ContextLauncher: 17/12/10 22:01:07 INFO CodeGenerator: Code generated in 11.302916 ms
spark_disconnect(sc)

Once you are done using livy locally, you should stop this service with:

livy_service_stop()

To connect to remote livy clusters that support basic authentication connect as:

# config <- livy_config_auth("<username>", "<password">)
# sc <- spark_connect(master = "<address>", method = "livy", config = config)
# spark_disconnect(sc)
---
title: "Tutorials on Using Spark in R with the Sparklyr Package"
output: html_notebook
author: Nana Boateng
df_print: paged
Time: '`r Sys.time()`'
date: "`r format(Sys.time(), '%d %B, %Y')`"
---


 
Spark allows provides the opportunity to store data for machine learning task on spark cloud.This frees the  memory requirements  of  the   local computer allowing for fast computation and model training.
More information on using spark in R with the help of the sparklyr package is available at rstudio spark website [here]( http://spark.rstudio.com/mlib/).

```{r setup,include=FALSE}

knitr::opts_chunk$set(echo = TRUE,
                      warning = FALSE,
                      out.width ="100%",
                      message = FALSE,
                      fig.align = 'default', 
                      warning = FALSE, 
                      fig.cap ="Fig. 30", 
                      out.width="100%")

options(repr.plot.height = 5, repr.plot.width = 6)
options(tidyverse.quiet = TRUE)

```





```{r}
pacman::p_load(sparklyr,tidyverse)


library(dplyr)

conf <- spark_config()

sc <- spark_connect(master = "local", version = "2.1.0")

spark_flights <- copy_to(sc, nycflights13::flights, "flights",overwrite = TRUE )


```

Install spark
```{r}
#spark_install(version = "2.1.0")

#spark_install(version = "2.1.0")
```




```{r}
# copy mtcars dataset into spark
mtcars_tbl <- copy_to(sc, mtcars, "mtcars", overwrite = TRUE)
# transform our data set, and then partition into 'training', 'test'
partitions <- mtcars_tbl %>%
  filter(hp >= 100) %>%
  mutate(cyl8 = cyl == 8) %>%
  sdf_partition(training = 0.5, test = 0.5, seed = 1099)



partitions %>%head()

```


#### Using dplyr
```{r}
iris_tbl <- copy_to(sc, iris,overwrite = TRUE)
flights_tbl <- copy_to(sc, nycflights13::flights, "flights",overwrite = TRUE)
batting_tbl <- copy_to(sc, Lahman::Batting, "batting",overwrite = TRUE)
src_tbls(sc)
```



```{r}
# filter by departure delay and print the first few records
flights_tbl %>% filter(dep_delay == 2)
```



```{r}
delay <- flights_tbl %>% 
  group_by(tailnum) %>%
  summarise(count = n(), dist = mean(distance), delay = mean(arr_delay)) %>%
  filter(count > 20, dist < 2000, !is.na(delay)) %>%
  collect

# plot delays
library(ggplot2)
ggplot(delay, aes(dist, delay)) +
  geom_point(aes(size = count), alpha = 1/2) +
  geom_smooth() +
  scale_size_area(max_size = 2)
```


####WINDOW FUNCTIONS

dplyr supports Spark SQL window functions. Window functions are used in conjunction with mutate and filter to solve a wide range of problems. You can compare the dplyr syntax to the query it has generated by using dbplyr::sql_render().

```{r}
head(batting_tbl)
```



```{r}
batting_tbl %>%
  dplyr::select(playerID, yearID, teamID, G, AB:H) %>%
  arrange(playerID, yearID, teamID) %>%
  group_by(playerID) %>%
  filter(min_rank(desc(H)) <= 2 & H > 0)
```



```{r}
# Find the most and least delayed flight each day
bestworst <- flights_tbl %>%
  group_by(year, month, day) %>%
  select(dep_delay) %>% 
  filter(dep_delay == min(dep_delay) || dep_delay == max(dep_delay))
dbplyr::sql_render(bestworst)
```



#### Using SQL

It’s also possible to execute SQL queries directly against tables within a Spark cluster. The spark_connection object implements a DBI interface for Spark, so you can use dbGetQuery to execute SQL and return the result as an R data frame:

```{r}
library(DBI)
iris_preview <- dbGetQuery(sc, "SELECT * FROM iris LIMIT 10")
iris_preview
```



#### Machine Learning

You can orchestrate machine learning algorithms in a Spark cluster via the machine learning functions within sparklyr. These functions connect to a set of high-level APIs built on top of DataFrames that help you create and tune machine learning workflows.

Here’s an example where we use ml_linear_regression to fit a linear regression model. We’ll use the built-in mtcars dataset, and see if we can predict a car’s fuel consumption (mpg) based on its weight (wt), and the number of cylinders the engine contains (cyl). We’ll assume in each case that the relationship between mpg and each of our features is linear.


```{r}
# copy mtcars into spark
mtcars_tbl <- copy_to(sc, mtcars,overwrite = TRUE)

# transform our data set, and then partition into 'training', 'test'
partitions <- mtcars_tbl %>%
  filter(hp >= 100) %>%
  mutate(cyl8 = cyl == 8) %>%
  sdf_partition(training = 0.5, test = 0.5, seed = 1099)

# fit a linear model to the training dataset
fit <- partitions$training %>%
  ml_linear_regression(response = "mpg", features = c("wt", "cyl"))
fit
```

For linear regression models produced by Spark, we can use summary() to learn a bit more about the quality of our fit, and the statistical significance of each of our predictors.

```{r}
summary(fit)
```

```{r}
#Principal Component Analysis (PCA)


#pca <- food_world_cup %>%
#  mutate_each_(funs(as.numeric), countries) %>%
#  ml_pca(features = paste(colnames(food_world_cup)[-c(1:47)]))
```

Spark machine learning supports a wide array of algorithms and feature transformations and as illustrated above it’s easy to chain these functions together with dplyr pipelines. To learn more see the machine learning section.

#### Reading and Writing Data

You can read and write data in CSV, JSON, and Parquet formats. Data can be stored in HDFS, S3, or on the local filesystem of cluster nodes.

```{r}
temp_csv <- tempfile(fileext = ".csv")
temp_parquet <- tempfile(fileext = ".parquet")
temp_json <- tempfile(fileext = ".json")

spark_write_csv(iris_tbl, temp_csv)
iris_csv_tbl <- spark_read_csv(sc, "iris_csv", temp_csv)

spark_write_parquet(iris_tbl, temp_parquet)
iris_parquet_tbl <- spark_read_parquet(sc, "iris_parquet", temp_parquet)

spark_write_json(iris_tbl, temp_json)
iris_json_tbl <- spark_read_json(sc, "iris_json", temp_json)

src_tbls(sc)
```


#### Distributed R

You can execute arbitrary r code across your cluster using spark_apply. For example, we can apply rgamma over iris as follows:

```{r}
spark_apply(iris_tbl, function(data) {
  data[1:4] + rgamma(1,2)
})
```


You can also group by columns to perform an operation over each group of rows and make use of any package within the closure:

```{r}
spark_apply(
  iris_tbl,
  function(e) broom::tidy(lm(Petal_Width ~ Petal_Length, e)),
  names = c("term", "estimate", "std.error", "statistic", "p.value"),
  group_by = "Species"
)
```


#### Extensions

The facilities used internally by sparklyr for its dplyr and machine learning interfaces are available to extension packages. Since Spark is a general purpose cluster computing system there are many potential applications for extensions (e.g. interfaces to custom machine learning pipelines, interfaces to 3rd party Spark packages, etc.).

Here’s a simple example that wraps a Spark text file line counting function with an R function:

```{r}
# write a CSV 
tempfile <- tempfile(fileext = ".csv")
write.csv(nycflights13::flights, tempfile, row.names = FALSE, na = "")

# define an R interface to Spark line counting
count_lines <- function(sc, path) {
  spark_context(sc) %>% 
    invoke("textFile", path, 1L) %>% 
      invoke("count")
}

# call spark to count the lines of the CSV
count_lines(sc, tempfile)
```


To learn more about creating extensions see the Extensions section of the sparklyr website.

#### Table Utilities

You can cache a table into memory with:

```{r}
tbl_cache(sc, "batting")
```

and unload from memory using:

```{r}
tbl_uncache(sc, "batting")
```

#### Connection Utilities

You can view the Spark web console using the spark_web function:

```{r}
spark_web(sc)
```

You can show the log using the spark_log function:

```{r}
spark_log(sc, n = 10)
```


Finally, we disconnect from Spark:

```{r}
spark_disconnect(sc)
```




#### Using H2O

rsparkling is a CRAN package from H2O that extends sparklyr to provide an interface into Sparkling Water. For instance, the following example installs, configures and runs h2o.glm:


```{r}
# The following two commands remove any previously installed H2O packages for R.
 #if ("package:h2o" %in% search()) { detach("package:h2o", unload=TRUE) }
# if ("h2o" %in% rownames(installed.packages())) { remove.packages("h2o") }
# 
# # Next, we download packages that H2O depends on.
# pkgs <- c("methods","statmod","stats","graphics","RCurl","jsonlite","tools","utils")
# for (pkg in pkgs) {
#     if (! (pkg %in% rownames(installed.packages()))) { install.packages(pkg) }
# }
# 
# # Now we download, install, and initialize the H2O package for R. 
# # In this case we are using rel-tverberg 2 (3.10.3.2).
# install.packages("h2o", type = "source", repos = "http://h2o-release.s3.amazonaws.com/h2o/rel-tverberg/2/R")
# 
# #Install rsparkling
# 
# #The latest stable version of rsparkling on CRAN can be installed as follows:
# 
# install.packages("rsparkling")
# 
# #Alternatively, the development version can be installed from the "master" branch as follows:
# 
# library(devtools)
# #devtools::install_github("h2oai/rsparkling", ref = "master")
# 
# options(rsparkling.sparklingwater.version = "2.0.1") # Using Sparkling Water 2.0.1
# library(rsparkling) 


```


```{r}
options(rsparkling.sparklingwater.version = "2.1.18")

pacman::p_load(rsparkling)

library(sparklyr)
library(dplyr)

# The following two commands remove any previously installed H2O packages for R.
#if ("package:h2o" %in% search()) { detach("package:h2o", unload=TRUE) }
#if ("h2o" %in% rownames(installed.packages())) { remove.packages("h2o") }

# Next, we download, install and initialize the H2O package for R.
#install.packages("h2o", repos=(c("http://s3.amazonaws.com/h2o-release/h2o/master/1497/R",getOption("repos"))))


#install.packages("h2o")


library(h2o)
localH2O = h2o.init()


sc <- spark_connect(master = "local", version = "2.1.0")

mtcars_tbl <- copy_to(sc, mtcars, "mtcars",overwrite = TRUE)

#Convert a Spark DataFrame to an H2O Frame
mtcars_h2o <- as_h2o_frame(sc, mtcars_tbl, strict_version_check = FALSE) 


```




```{r}

mtcars_glm <- h2o.glm(x = c("wt", "cyl"), 
                      y = "mpg",
                      training_frame = mtcars_h2o,
                      lambda_search = TRUE)
```



```{r}
mtcars_glm
```

```{r}
spark_disconnect(sc)
```


#### Connecting through Livy

Livy enables remote connections to Apache Spark clusters. Connecting to Spark clusters through Livy is under experimental development in sparklyr. 

Before connecting to Livy, you will need the connection information to an existing service running Livy. Otherwise, to test livy in your local environment, you can install it and run it locally as follows:


```{r}
#livy_install()
```


```{r,message=FALSE,warning=FALSE}
livy_service_start()
```



To connect, use the Livy service address as master and method = "livy" in spark_connect. Once connection completes, use sparklyr as usual, for instance:


```{r,message=FALSE,warning=FALSE}
sc <- spark_connect(master = "http://localhost:8998", method = "livy")
copy_to(sc, iris)
```



```{r,message=FALSE,warning=FALSE}
spark_disconnect(sc)
```

Once you are done using livy locally, you should stop this service with:

```{r}
livy_service_stop()
```

To connect to remote livy clusters that support basic authentication connect as:

```{r}
# config <- livy_config_auth("<username>", "<password">)
# sc <- spark_connect(master = "<address>", method = "livy", config = config)
# spark_disconnect(sc)
```

