rm(list = ls(all=TRUE))
options(digits=5, scipen=10)

library(dplyr)      # load before sparklyr
library(sparklyr)   # the dplyr in spark 
library(tidyr)
library(ggplot2)
library(dygraphs)
library(RColorBrewer)
Path to HDFS - Hadoop File System
path = "hdfs://192.168.1.100:9000/home/tonychuo/"  
Connect to Spark Context
# Spark Context
Sys.setenv(SPARK_HOME="/usr/local/spark/spark-2.1.0-bin-hadoop2.7/")
config = spark_config()
# config$spark.ui.port = "4043"
config$spark.executor.memory = "4G"
config$spark.driver.memory = "4G" 
config$spark.yarn.executor.memoryOverhead = "4096"
sc <- spark_connect(master = "spark://hnamenode:7077", config = config)

Read CSV File

baby = spark_read_csv(sc, "baby", paste0(path, "baby/babynames.csv"))
tbl_cache(sc, "baby")   # cache the file
count(baby)             # no. rows in baby
## # Source:   lazy query [?? x 1]
## # Database: spark_connection
##        nn
##     <dbl>
## 1 1858689


Example 1 - Aggrgate then Spread

baby %>% 
  group_by(year, sex) %>% 
  summarise(millions = sum(n)/1000000) %>% 
  collect() %>% 
  mutate(sex = recode(sex, "F"="Female", "M"="Male")) %>% 
  spread(sex, millions) %>% 
  dygraph(main="No. New Borns", ylab="millions") %>%
  dySeries("Female") %>% dySeries("Male") %>%
  dyOptions(stackedGraph = TRUE) %>%
  dyRangeSelector(height = 20)


Example 2 - The Three Steps

FIG-1: Standard Data Processing Pipeline

FIG-1: Standard Data Processing Pipeline


baby %>%
  group_by(year,sex) %>%
  filter(min_rank(desc(n)) == 1 & sex =="F") %>%
  ungroup() %>% 
  select(sex, name) %>%
  distinct() %>% 
  inner_join(baby) %>% 
  select(year, name, prop)%>% 
  collect() %>% 
  mutate(prop = ifelse(prop < 0.005, NA, 100*prop)) %>% 
  spread(name, prop) %>% 
  dygraph("Trend of the Most Popular Names") %>% 
  dyLegend("auto", labelsSeparateLines=T, width=120) %>%
  dyOptions(strokeWidth=2, colors=brewer.pal(8, "Set2")) %>%
  dyHighlight(highlightCircleSize = 5,
              highlightSeriesBackgroundAlpha = 0.3) %>% 
  dyRangeSelector()
## Joining, by = c("sex", "name")


Always Remember to Disconnect !!!
# PLEASE Always Rememeber to Disconnect !!!
spark_disconnect(sc)