Understanding Spark

Spark is a library of code that can be used to process data in parallel on a cluster. The basic idea of Spark is parallelism, meaning Spark breaks the data into pieces, sends the pieces to differnt computers for processing, then sends the results back and process the combination to get the final result. More specifically, the basic computing paradigm is: distribute a large data set on multiple nodes, map functions row by row, group data by a key, and then perform aggregate operations.

Basic concepts of Spark include: RDDs, transformation, action

  1. RDD
    RDD stands for resilient distributed datasets. It is the basic data structure defined by Spark so the data can be distributed among cluster nodes. RDD contains large information, we can apply actions to RDD to return values, and transformations to return new RDD.

  2. RDD transformation
    Commonly used RDD transformation include map, filter, reduce, and reduceByKey.
    rdd.map: map is like the map function in python, it applies a function to each element of rdd. We can save the output as a new RDD.
    rdd.filter: filter defines a condition, and return a new RDD containing only the elements that satisfy the condition.
    rdd.reduce: it works the same as reduce function in python. Reduce is a function that takes two elements in RDD at one time, and perform some computation (ex. sum, mean) and reduce the two elements to a single value.
    rdd.reduceByKey: same idea as reduce, but it works with Pair RDD where a group key is attached to each elements in RDD. ReduceByKey then do a reduce within each group.

  3. RDD action
    Commonly used actions:
    rdd.collect: to return values rdd.take: equals to ‘print’

API options: RDD VS DataFrame
1. RDD API:
It is the oldest released Spark API. It is simply a set of Java or Scala objects representing data. Spark distributes data within a cluster using Java serialization, which is a process of converting an object to a series of bytes. It is relatively computational expensive since it requires to send both data and structure between nodes.
2. DataFrame API
DataFrame API was introduced in Spark 1.3. The idea is that, comparing to RDD, Dataframe introduces the concept of schema, which refers to the structure of data. Dataframe allows Spark to manage the structure and only send the data between nodes, which is more efficient than Jave serialization. Dataframe API is a lot like relational queries. We can use various relational operations that are similar to SQL expression.

SparkR

Download and Installation

Spark can be downloaded directly from the link: http://spark.apache.org/downloads.html

After downloading, save the zipped file to a directory of choice, and then unzip the file.
We can then set up Spark in R environment.

  1. Set system environment by pointing R session to the installed SparkR.
Sys.setenv(SPARK_HOME = "/Users/Wendy/spark-1.5.0-bin-hadoop2.6/")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
  1. Set library path and load SparkR library
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)
  1. Initialize Spark context and SQL context
sc <- sparkR.init(master = "local",sparkEnvir = list(spark.driver.memory="2g"))
## Launching java with spark-submit command /Users/Wendy/spark-1.5.0-bin-hadoop2.6//bin/spark-submit   sparkr-shell /var/folders/lg/7dvbrzm96hs16tnrz48z96fc0000gn/T//RtmpxXt3EO/backend_porta721df8d499
sqlContext <- sparkRSQL.init(sc)

Dataframe Operations

  1. Create data frame: Data frames can be created from a local R data frame, from direct data source, or from a Hive table. The easiest way is to convert a local R data frame into a SaprkR data frame. Here I used the famous Iris data frame as an example.
df <- createDataFrame(sqlContext, iris)
head(df)
##   Sepal_Length Sepal_Width Petal_Length Petal_Width Species
## 1          5.1         3.5          1.4         0.2  setosa
## 2          4.9         3.0          1.4         0.2  setosa
## 3          4.7         3.2          1.3         0.2  setosa
## 4          4.6         3.1          1.5         0.2  setosa
## 5          5.0         3.6          1.4         0.2  setosa
## 6          5.4         3.9          1.7         0.4  setosa
  1. Select and Filter:
    selecting only the ‘Sepal_Length’ and ‘Species’ column
head(select(df, df$Sepal_Length, df$Species )) 
##   Sepal_Length Species
## 1          5.1  setosa
## 2          4.9  setosa
## 3          4.7  setosa
## 4          4.6  setosa
## 5          5.0  setosa
## 6          5.4  setosa

selecting rows with ‘Sepal_Lenght’ greater than 5.5

head(filter(df, df$Sepal_Length >5.5))
##   Sepal_Length Sepal_Width Petal_Length Petal_Width    Species
## 1          5.8         4.0          1.2         0.2     setosa
## 2          5.7         4.4          1.5         0.4     setosa
## 3          5.7         3.8          1.7         0.3     setosa
## 4          7.0         3.2          4.7         1.4 versicolor
## 5          6.4         3.2          4.5         1.5 versicolor
## 6          6.9         3.1          4.9         1.5 versicolor

selecting only the ‘Sepal_Length’ and ‘Species’ column with ‘Sepal_Lenght’ greater than 5.5

head(select(filter(df, df$Sepal_Length >5.5), df$Sepal_Length, df$Species))
##   Sepal_Length    Species
## 1          5.8     setosa
## 2          5.7     setosa
## 3          5.7     setosa
## 4          7.0 versicolor
## 5          6.4 versicolor
## 6          6.9 versicolor
  1. Grouping and Aggregation
    Calculating the mean of sepal length of each species, and also the number of observation of each species.
df2<-summarize(groupBy(df, df$Species), mean=mean(df$Sepal_Length), count=n(df$Sepal_Length))
head(df2)
##      Species  mean count
## 1 versicolor 5.936    50
## 2     setosa 5.006    50
## 3  virginica 6.588    50

Sort the output data frame by the mean sepal length

head(arrange(df2, desc(df2$mean)))
##      Species  mean count
## 1  virginica 6.588    50
## 2 versicolor 5.936    50
## 3     setosa 5.006    50
  1. Combine queries with Marittr
    In R, we can combine Dataframe operations using library magrittr. Here is an example that combined the above operations into one.
library(magrittr)
finaldf<-filter(df, df$Sepal_Length >5.5) %>%
  group_by(df$Species)%>%
  summarize(mean=mean(df$Sepal_Length))
arrange(finaldf, desc(finaldf$mean)) %>% head
##      Species     mean
## 1  virginica 6.622449
## 2 versicolor 6.120513
## 3     setosa 5.733333
  1. SQL Queries
    We can register Spark Dataframe as sql table, which enables us to run SQL queries and return the output as a data frame.
    Register Spark Dataframe as a table.
registerTempTable(df,"df")

Run SQL expression using sqlContext

dfSQL<-sql(sqlContext, "SELECT * FROM df WHERE Sepal_Length > 5.5")

Call collect to get a local dataframe

dflocal<-collect(dfSQL)
print(dflocal[1:10,])
##    Sepal_Length Sepal_Width Petal_Length Petal_Width    Species
## 1           5.8         4.0          1.2         0.2     setosa
## 2           5.7         4.4          1.5         0.4     setosa
## 3           5.7         3.8          1.7         0.3     setosa
## 4           7.0         3.2          4.7         1.4 versicolor
## 5           6.4         3.2          4.5         1.5 versicolor
## 6           6.9         3.1          4.9         1.5 versicolor
## 7           6.5         2.8          4.6         1.5 versicolor
## 8           5.7         2.8          4.5         1.3 versicolor
## 9           6.3         3.3          4.7         1.6 versicolor
## 10          6.6         2.9          4.6         1.3 versicolor

Machine Learning - general linear regression

Here we continue to use Iris dataset. The goal is to predict sepal length.
1. Preparing a train/test data set
we want to randomly split the Iris dataframe into 20/80. 20% as test set, and 80% as train set.

There is no split function in SparkR. To split the Dataframe, I first created an ID column, then randomly select 20% of the data as the new test set. I matched the IDs from the test set with IDs from the original data frame, and then created a new column to indicate if the observations were assigned to the test set. Lastly, I selected the observations that are NOT in the test set, and assigned them as the new train set.

This method is very tedious. It is probably not very scalable to larger data set.

#create an ID column
iris$ID<-c(1:nrow(iris))
df <- createDataFrame(sqlContext, iris)
#total number of observations
nrow(df)
## [1] 150
#20% data as test set
df_test<-sample(df, FALSE, 0.2)
nrow(df_test)  
## [1] 21
#80% data as train set
testID<-collect(select(df_test, "ID"))$ID
df$istest<-df$ID %in% testID
df_train<-subset(df, df$istest==FALSE)
nrow(df_train)
## [1] 129
  1. Train a linear model
#fit model
model<-glm(Sepal_Length ~ . - ID - istest , data=df_train, family="gaussian")
#look at model summary
summary(model)
## $coefficients
##                       Estimate
## (Intercept)          1.2303739
## Sepal_Width          0.5090809
## Petal_Length         0.7886571
## Petal_Width         -0.2781813
## Species__versicolor  0.3035921
## Species__setosa      0.9335554
  1. Model evaluation using the test set
    SparkR doesn’t have built-in functions to calculate model error. So I manually calculated the R-squared. By definition, R2= 1-(residual sum of squares/total sum of squares)
#makde predictions 
prediction<-predict(model, newData=df_test)
head(select(prediction, "Sepal_Length", "prediction"))
##   Sepal_Length prediction
## 1          4.8   4.767474
## 2          5.4   5.063327
## 3          5.1   4.966378
## 4          4.9   4.869430
## 5          5.0   5.040655
## 6          5.3   5.174878
#mean of Sepal_Length
smean<-collect(agg(df_train, mean=mean(df_train$Sepal_Length)))$mean
smean
## [1] 5.805426
#Squared residual and squared total
prediction<-transform(
  prediction,
  s_res=(prediction$Sepal_Length - prediction$prediction)**2,
  s_tot=(prediction$Sepal_Length - smean)**2)
head(select(prediction, "Sepal_Length", "prediction", "s_res", "s_tot"))
##   Sepal_Length prediction        s_res     s_tot
## 1          4.8   4.767474 0.0010579455 1.0108822
## 2          5.4   5.063327 0.1133489344 0.1643705
## 3          5.1   4.966378 0.0178548042 0.4976263
## 4          4.9   4.869430 0.0009345497 0.8197969
## 5          5.0   5.040655 0.0016528408 0.6487116
## 6          5.3   5.174878 0.0156554755 0.2554558
#Sum of squares
res<-collect(agg(prediction, 
                 ss_res=sum(prediction$s_res),
                 ss_tot=sum(prediction$s_tot)))
res
##     ss_res   ss_tot
## 1 1.734297 18.25767
#R-squared
R2=1-(res$ss_res/res$ss_tot)
R2
## [1] 0.90501

Thoughts

SparkR is an easy way for me to transition from R to Spark. Dataframe API provides a very natural way to code. In this exercise, I practiced the basic Dataframe operations and building a simple linear regression model. However, I feel like there are some limitations in the current SparkR for machine learning. For example,
1. There should be an easier way to split train/test set.
2. The summary function of models doesn’t provide much information other than coefficients. It would be nice to have some errors and significant levels.
3. There needs to be more powerful models that can handle more complex formulas.
I think ML libraries for pyspark is more complete than SparkR and there are more documentations as well. My next step is to learn pyspark, and also start learning the fundamentals of Scala.


Sparkling Water

Background

Sparkling Water, the happy marriage between open source technologies Apache Spark and H2O. It combines the advanced machine-learning algorithms from H2O with the execution power of Spark. The Standard workflows of Sparkling Waters contains four steps:
1. Load and parse data
2. Data munging
3. Create models (Deep learning, GBM)
4. Apply models to make better predictions


Just like sqlContext that connects Spark API to SQL queries, H2OContext is the portal between Spark RDD and H2O Frame. Sparkling Water provides quick conversion between the two with two simply operators: asH2OFrame(RDD), and asSchemaRDD(H2OFrame). asH2OFrame publishes Spark RDD as H2O Frame, and asSchemaRDD publishes H2O Frame as Spark RDD.

Exercises and thoughts

I picked two exercises to practice how to use Sparkling Water to create a simple machine-learning application.
https://github.com/h2oai/sparkling-water/tree/master/examples#step-by-step-through-weather-data-example
http://blog.cloudera.com/blog/2015/10/how-to-build-a-machine-learning-app-using-sparkling-water-and-apache-spark/
I was able to follow the demonstration step by step.
1. Download Sparkling Water and point SPARK_HOME to the previous installed Spark.
2. Initiate a local cluster and create a H2O cloud inside the cluster.
3. Load data source as Spark RDD/DF and perform data munging.
4. Join multiple tables with help of Spark sqlContext.
5. Publish the processed data as H2O Frame.
6. Train models using H2O.
7. Model predition.
The code was written in Scala. Since I am not familiar with Scala syntax, it was difficult for me to make changes to the code. However, I have good understanding of the concept and the workflow. I am planning to learn Scala. I found a few useful online resources and plan take the tutorial on Scala Exercises (http://scala-exercises.47deg.com/index.html). Hopefully in the near future I will be able to fully utilize the power of Sparking Water.