Spark is a library of code that can be used to process data in parallel on a cluster. The basic idea of Spark is parallelism, meaning Spark breaks the data into pieces, sends the pieces to differnt computers for processing, then sends the results back and process the combination to get the final result. More specifically, the basic computing paradigm is: distribute a large data set on multiple nodes, map functions row by row, group data by a key, and then perform aggregate operations.
Basic concepts of Spark include: RDDs, transformation, action
RDD RDD stands for resilient distributed datasets. It is the basic data structure defined by Spark so the data can be distributed among cluster nodes. RDD contains large information, we can apply actions to RDD to return values, and transformations to return new RDD.
RDD transformation Commonly used RDD transformation include map, filter, reduce, and reduceByKey. rdd.map: map is like the map function in python, it applies a function to each element of rdd. We can save the output as a new RDD. rdd.filter: filter defines a condition, and return a new RDD containing only the elements that satisfy the condition. rdd.reduce: it works the same as reduce function in python. Reduce is a function that takes two elements in RDD at one time, and perform some computation (ex. sum, mean) and reduce the two elements to a single value. rdd.reduceByKey: same idea as reduce, but it works with Pair RDD where a group key is attached to each elements in RDD. ReduceByKey then do a reduce within each group.
RDD action Commonly used actions: rdd.collect: to return values rdd.take: equals to ‘print’
API options: RDD VS DataFrame 1. RDD API: It is the oldest released Spark API. It is simply a set of Java or Scala objects representing data. Spark distributes data within a cluster using Java serialization, which is a process of converting an object to a series of bytes. It is relatively computational expensive since it requires to send both data and structure between nodes. 2. DataFrame API DataFrame API was introduced in Spark 1.3. The idea is that, comparing to RDD, Dataframe introduces the concept of schema, which refers to the structure of data. Dataframe allows Spark to manage the structure and only send the data between nodes, which is more efficient than Jave serialization. Dataframe API is a lot like relational queries. We can use various relational operations that are similar to SQL expression.
Spark can be downloaded directly from the link: http://spark.apache.org/downloads.html
After downloading, save the zipped file to a directory of choice, and then unzip the file.
We can then set up Spark in R environment.
Sys.setenv(SPARK_HOME = "/Users/Wendy/spark-1.5.0-bin-hadoop2.6/")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)
sc <- sparkR.init(master = "local",sparkEnvir = list(spark.driver.memory="2g"))
## Launching java with spark-submit command /Users/Wendy/spark-1.5.0-bin-hadoop2.6//bin/spark-submit sparkr-shell /var/folders/lg/7dvbrzm96hs16tnrz48z96fc0000gn/T//RtmpxXt3EO/backend_porta721df8d499
sqlContext <- sparkRSQL.init(sc)
df <- createDataFrame(sqlContext, iris)
head(df)
## Sepal_Length Sepal_Width Petal_Length Petal_Width Species
## 1 5.1 3.5 1.4 0.2 setosa
## 2 4.9 3.0 1.4 0.2 setosa
## 3 4.7 3.2 1.3 0.2 setosa
## 4 4.6 3.1 1.5 0.2 setosa
## 5 5.0 3.6 1.4 0.2 setosa
## 6 5.4 3.9 1.7 0.4 setosa
head(select(df, df$Sepal_Length, df$Species ))
## Sepal_Length Species
## 1 5.1 setosa
## 2 4.9 setosa
## 3 4.7 setosa
## 4 4.6 setosa
## 5 5.0 setosa
## 6 5.4 setosa
selecting rows with ‘Sepal_Lenght’ greater than 5.5
head(filter(df, df$Sepal_Length >5.5))
## Sepal_Length Sepal_Width Petal_Length Petal_Width Species
## 1 5.8 4.0 1.2 0.2 setosa
## 2 5.7 4.4 1.5 0.4 setosa
## 3 5.7 3.8 1.7 0.3 setosa
## 4 7.0 3.2 4.7 1.4 versicolor
## 5 6.4 3.2 4.5 1.5 versicolor
## 6 6.9 3.1 4.9 1.5 versicolor
selecting only the ‘Sepal_Length’ and ‘Species’ column with ‘Sepal_Lenght’ greater than 5.5
head(select(filter(df, df$Sepal_Length >5.5), df$Sepal_Length, df$Species))
## Sepal_Length Species
## 1 5.8 setosa
## 2 5.7 setosa
## 3 5.7 setosa
## 4 7.0 versicolor
## 5 6.4 versicolor
## 6 6.9 versicolor
df2<-summarize(groupBy(df, df$Species), mean=mean(df$Sepal_Length), count=n(df$Sepal_Length))
head(df2)
## Species mean count
## 1 versicolor 5.936 50
## 2 setosa 5.006 50
## 3 virginica 6.588 50
Sort the output data frame by the mean sepal length
head(arrange(df2, desc(df2$mean)))
## Species mean count
## 1 virginica 6.588 50
## 2 versicolor 5.936 50
## 3 setosa 5.006 50
library(magrittr)
finaldf<-filter(df, df$Sepal_Length >5.5) %>%
group_by(df$Species)%>%
summarize(mean=mean(df$Sepal_Length))
arrange(finaldf, desc(finaldf$mean)) %>% head
## Species mean
## 1 virginica 6.622449
## 2 versicolor 6.120513
## 3 setosa 5.733333
registerTempTable(df,"df")
Run SQL expression using sqlContext
dfSQL<-sql(sqlContext, "SELECT * FROM df WHERE Sepal_Length > 5.5")
Call collect to get a local dataframe
dflocal<-collect(dfSQL)
print(dflocal[1:10,])
## Sepal_Length Sepal_Width Petal_Length Petal_Width Species
## 1 5.8 4.0 1.2 0.2 setosa
## 2 5.7 4.4 1.5 0.4 setosa
## 3 5.7 3.8 1.7 0.3 setosa
## 4 7.0 3.2 4.7 1.4 versicolor
## 5 6.4 3.2 4.5 1.5 versicolor
## 6 6.9 3.1 4.9 1.5 versicolor
## 7 6.5 2.8 4.6 1.5 versicolor
## 8 5.7 2.8 4.5 1.3 versicolor
## 9 6.3 3.3 4.7 1.6 versicolor
## 10 6.6 2.9 4.6 1.3 versicolor
Here we continue to use Iris dataset. The goal is to predict sepal length. 1. Preparing a train/test data set we want to randomly split the Iris dataframe into 20/80. 20% as test set, and 80% as train set.
There is no split function in SparkR. To split the Dataframe, I first created an ID column, then randomly select 20% of the data as the new test set. I matched the IDs from the test set with IDs from the original data frame, and then created a new column to indicate if the observations were assigned to the test set. Lastly, I selected the observations that are NOT in the test set, and assigned them as the new train set.
This method is very tedious. It is probably not very scalable to larger data set.
#create an ID column
iris$ID<-c(1:nrow(iris))
df <- createDataFrame(sqlContext, iris)
#total number of observations
nrow(df)
## [1] 150
#20% data as test set
df_test<-sample(df, FALSE, 0.2)
nrow(df_test)
## [1] 21
#80% data as train set
testID<-collect(select(df_test, "ID"))$ID
df$istest<-df$ID %in% testID
df_train<-subset(df, df$istest==FALSE)
nrow(df_train)
## [1] 129
#fit model
model<-glm(Sepal_Length ~ . - ID - istest , data=df_train, family="gaussian")
#look at model summary
summary(model)
## $coefficients
## Estimate
## (Intercept) 1.2303739
## Sepal_Width 0.5090809
## Petal_Length 0.7886571
## Petal_Width -0.2781813
## Species__versicolor 0.3035921
## Species__setosa 0.9335554
#makde predictions
prediction<-predict(model, newData=df_test)
head(select(prediction, "Sepal_Length", "prediction"))
## Sepal_Length prediction
## 1 4.8 4.767474
## 2 5.4 5.063327
## 3 5.1 4.966378
## 4 4.9 4.869430
## 5 5.0 5.040655
## 6 5.3 5.174878
#mean of Sepal_Length
smean<-collect(agg(df_train, mean=mean(df_train$Sepal_Length)))$mean
smean
## [1] 5.805426
#Squared residual and squared total
prediction<-transform(
prediction,
s_res=(prediction$Sepal_Length - prediction$prediction)**2,
s_tot=(prediction$Sepal_Length - smean)**2)
head(select(prediction, "Sepal_Length", "prediction", "s_res", "s_tot"))
## Sepal_Length prediction s_res s_tot
## 1 4.8 4.767474 0.0010579455 1.0108822
## 2 5.4 5.063327 0.1133489344 0.1643705
## 3 5.1 4.966378 0.0178548042 0.4976263
## 4 4.9 4.869430 0.0009345497 0.8197969
## 5 5.0 5.040655 0.0016528408 0.6487116
## 6 5.3 5.174878 0.0156554755 0.2554558
#Sum of squares
res<-collect(agg(prediction,
ss_res=sum(prediction$s_res),
ss_tot=sum(prediction$s_tot)))
res
## ss_res ss_tot
## 1 1.734297 18.25767
#R-squared
R2=1-(res$ss_res/res$ss_tot)
R2
## [1] 0.90501
Just like sqlContext that connects Spark API to SQL queries, H2OContext is the portal between Spark RDD and H2O Frame. Sparkling Water provides quick conversion between the two with two simply operators: asH2OFrame(RDD), and asSchemaRDD(H2OFrame). asH2OFrame publishes Spark RDD as H2O Frame, and asSchemaRDD publishes H2O Frame as Spark RDD.
I picked two exercises to practice how to use Sparkling Water to create a simple machine-learning application. https://github.com/h2oai/sparkling-water/tree/master/examples#step-by-step-through-weather-data-example
http://blog.cloudera.com/blog/2015/10/how-to-build-a-machine-learning-app-using-sparkling-water-and-apache-spark/
I was able to follow the demonstration step by step. 1. Download Sparkling Water and point SPARK_HOME to the previous installed Spark. 2. Initiate a local cluster and create a H2O cloud inside the cluster. 3. Load data source as Spark RDD/DF and perform data munging. 4. Join multiple tables with help of Spark sqlContext. 5. Publish the processed data as H2O Frame. 6. Train models using H2O. 7. Model predition. The code was written in Scala. Since I am not familiar with Scala syntax, it was difficult for me to make changes to the code. However, I have good understanding of the concept and the workflow. I am planning to learn Scala. I found a few useful online resources and plan take the tutorial on Scala Exercises (http://scala-exercises.47deg.com/index.html). Hopefully in the near future I will be able to fully utilize the power of Sparking Water.