rm(list = ls(all=TRUE))
options(digits=5, scipen=10)
library(dplyr) # load before sparklyr
library(sparklyr) # the dplyr in spark
library(tidyr)
library(ggplot2)
library(dygraphs)
library(RColorBrewer)path = "hdfs://192.168.1.100:9000/home/tonychuo/" # Spark Context
Sys.setenv(SPARK_HOME="/usr/local/spark/spark-2.1.0-bin-hadoop2.7/")
config = spark_config()
# config$spark.ui.port = "4043"
config$spark.executor.memory = "4G"
config$spark.driver.memory = "4G"
config$spark.yarn.executor.memoryOverhead = "4096"
sc <- spark_connect(master = "spark://hnamenode:7077", config = config)baby = spark_read_csv(sc, "baby", paste0(path, "baby/babynames.csv"))
tbl_cache(sc, "baby") # cache the file
count(baby) # no. rows in baby## # Source: lazy query [?? x 1]
## # Database: spark_connection
## nn
## <dbl>
## 1 1858689
baby %>%
group_by(year, sex) %>%
summarise(millions = sum(n)/1000000) %>%
collect() %>%
mutate(sex = recode(sex, "F"="Female", "M"="Male")) %>%
spread(sex, millions) %>%
dygraph(main="No. New Borns", ylab="millions") %>%
dySeries("Female") %>% dySeries("Male") %>%
dyOptions(stackedGraph = TRUE) %>%
dyRangeSelector(height = 20)FIG-1: Standard Data Processing Pipeline
baby %>%
group_by(year,sex) %>%
filter(min_rank(desc(n)) == 1 & sex =="F") %>%
ungroup() %>%
select(sex, name) %>%
distinct() %>%
inner_join(baby) %>%
select(year, name, prop)%>%
collect() %>%
mutate(prop = ifelse(prop < 0.005, NA, 100*prop)) %>%
spread(name, prop) %>%
dygraph("Trend of the Most Popular Names") %>%
dyLegend("auto", labelsSeparateLines=T, width=120) %>%
dyOptions(strokeWidth=2, colors=brewer.pal(8, "Set2")) %>%
dyHighlight(highlightCircleSize = 5,
highlightSeriesBackgroundAlpha = 0.3) %>%
dyRangeSelector()## Joining, by = c("sex", "name")
# PLEASE Always Rememeber to Disconnect !!!
spark_disconnect(sc)