sparklyr: Use Spark from R like dplyr

In this document, we will show you a visualization and build a predictive model of US flights with sparklyr. Flight visualization code is based on this article: http://flowingdata.com/2011/05/11/how-to-map-connections-with-great-circles/

If you are interested in sparklyr, you can try with official document, or you also can try for Spark cluster with Cloudera Director. We built a Spark cluster with Cloudera Director.

This document assumes you already have the following tables:

You should make these tables available through Apache Hive or Apache Impala (incubating) with Hue. After installation of sparklyr and instantiation of the Spark cluster with Cloudera Director configuration file, you can access the RStudio server on <sparklyr-gateway-hostname>:8787 with your browser and log in with rsuser/cloudera.

If you will try sparklyr, the official cheatsheet is very helpful.

Connect to Spark with sparklyr

Let’s connect to your Spark cluster with sparklyr. In this post, we installed Spark 2.0 additionally. Before running the following code, you should install additional R packages as install.packages(c("ggplot2", "maps", "geosphere", "dplyr")) .

# Load libraries
library(ggplot2)
library(maps)
library(geosphere)
## Loading required package: sp
library(sparklyr)
library(dplyr)
## 
## Attaching package: 'dplyr'
## The following objects are masked from 'package:stats':
## 
##     filter, lag
## The following objects are masked from 'package:base':
## 
##     intersect, setdiff, setequal, union
# Configure cluster
config <- spark_config()
config$spark.driver.cores   <- 4
config$spark.executor.cores <- 4
config$spark.executor.memory <- "4G"
#spark_home <- "/opt/cloudera/parcels/CDH/lib/spark"
#spark_version <- "1.6.2"
spark_home <- "/opt/cloudera/parcels/SPARK2/lib/spark2"
spark_version <- "2.0.0"
sc <- spark_connect(master="yarn-client", version=spark_version, config=config, spark_home=spark_home)

Read the table from S3 and plot with ggplot

Summarize flight number of airlines_bi_pq table by year.

airlines <- tbl(sc, "airlines_bi_pq")
airlines
## Source:   query [1.235e+08 x 30]
## Database: spark connection master=yarn-client app=sparklyr local=FALSE
## 
##     year month   day dayofweek dep_time crs_dep_time arr_time crs_arr_time
##    <int> <int> <int>     <int>    <int>        <int>    <int>        <int>
## 1   2006     6     3         6      840          830     1006         1007
## 2   2006     6     4         7      830          830      958         1007
## 3   2006     6     5         1      827          830     1004         1007
## 4   2006     6     6         2      830          830     1006         1007
## 5   2006     6     7         3      831          830     1005         1007
## 6   2006     6     8         4      826          830      958         1007
## 7   2006     6     9         5      826          830      958         1007
## 8   2006     6    10         6      845          830     1011         1007
## 9   2006     6    11         7      828          830      950         1007
## 10  2006     6    12         1      829          830      956         1007
## # ... with 1.235e+08 more rows, and 22 more variables: carrier <chr>,
## #   flight_num <int>, tail_num <int>, actual_elapsed_time <int>,
## #   crs_elapsed_time <int>, airtime <int>, arrdelay <int>, depdelay <int>,
## #   origin <chr>, dest <chr>, distance <int>, taxi_in <int>,
## #   taxi_out <int>, cancelled <int>, cancellation_code <chr>,
## #   diverted <int>, carrier_delay <int>, weather_delay <int>,
## #   nas_delay <int>, security_delay <int>, late_aircraft_delay <int>,
## #   date_yyyymm <chr>
airline_counts_by_year <- airlines %>% group_by(year) %>% summarise(count=n()) %>% collect
airline_counts_by_year %>% tbl_df %>% print(n=nrow(.))
## # A tibble: 22 Ă— 2
##     year   count
##    <int>   <dbl>
## 1   1990 5270893
## 2   2003 6488540
## 3   2007 7453215
## 4   2006 7141922
## 5   1997 5411843
## 6   1988 5202096
## 7   1994 5180048
## 8   2004 7129270
## 9   1991 5076925
## 10  1996 5351983
## 11  1989 5041200
## 12  1998 5384721
## 13  1987 1311826
## 14  1995 5327435
## 15  2001 5967780
## 16  1992 5092157
## 17  2005 7140596
## 18  2000 5683047
## 19  2008 7009728
## 20  1999 5527884
## 21  2002 5271359
## 22  1993 5070501

sparklyr’s table is evaluated lazily, so you should use collect to convert into a data.frame.

Plot summarized data with ggplot:

g <- ggplot(airline_counts_by_year, aes(x=year, y=count))
g <- g + geom_line(
  colour = "magenta",
  linetype = 1,
  size = 0.8
)
g <- g + xlab("Year")
g <- g + ylab("Flight number")
g <- g + ggtitle("US flights")
plot(g)

We found the decreacing of flight number in 2002. But why?

See flight number between 2001 and 2003

Next, let’s dig it for the 2002 data. Let’s plot flight number betwewen 2001 and 2003.

airline_counts_by_month <- airlines %>% filter(year>= 2001 & year<=2003) %>% group_by(year, month) %>% summarise(count=n()) %>% collect

g <- ggplot(
  airline_counts_by_month, 
  aes(x=as.Date(sprintf("%d-%02d-01", airline_counts_by_month$year, airline_counts_by_month$month)), y=count)
  )
g <- g + geom_line(
  colour = "magenta",
  linetype = 1,
  size = 0.8
)
g <- g + xlab("Year/Month")
g <- g + ylab("Flight number")
g <- g + ggtitle("US flights")
plot(g)

It appears that the number of flights after Sept. 2001 significantly decreased. We can understand it is the effect of 9/11. In this way, sparklyr makes exploratory data analysis easier for large-scale data, so we can obtain new insight quickly.

Summarize flight data by year, carrier, origin and dest

Next, we will summarize the data by carrier, origin and destination.

flights <- airlines %>% group_by(year, carrier, origin, dest) %>% summarise(count=n()) %>% collect
flights
## Source: local data frame [123,737 x 5]
## Groups: year, carrier, origin [24,636]
## 
##     year carrier origin  dest count
##    <int>   <chr>  <chr> <chr> <dbl>
## 1   2008      US    SJU   PHL  1109
## 2   2008      US    PDX   LAS   784
## 3   1999      CO    SLC   CLE   105
## 4   1997      WN    HOU   LIT   364
## 5   2004      CO    TPA   IAH  1807
## 6   2004      CO    FLL   IAH  1823
## 7   2005      DL    MCI   ATL  3502
## 8   2007      YV    CLT   BNA   383
## 9   2005      DL    JFK   SLC   901
## 10  2007      YV    ORD   ABE   794
## # ... with 123,727 more rows
airports <- tbl(sc, "airports_new_pq") %>% collect

Now we extract AA’s flight in 2007.

flights_aa <- flights %>% filter(year==2007) %>% filter(carrier=="AA") %>% arrange(count)
flights_aa
## Source: local data frame [460 x 5]
## Groups: year, carrier, origin [82]
## 
##     year carrier origin  dest count
##    <int>   <chr>  <chr> <chr> <dbl>
## 1   2007      AA    BOS   JFK     1
## 2   2007      AA    MIA   FLL     1
## 3   2007      AA    BOS   SDF     1
## 4   2007      AA    MIA   XNA     1
## 5   2007      AA    BNA   IAD     1
## 6   2007      AA    XNA   MIA     2
## 7   2007      AA    OGG   ORD     8
## 8   2007      AA    ORD   OGG     8
## 9   2007      AA    EGE   LGA    17
## 10  2007      AA    MTJ   ORD    17
## # ... with 450 more rows

Plotting flights into map.

Let’s plot the flight number of AA in 2007 on a map. You can change the condition of a filter to plot other airlines.

# draw map with line of AA
xlim <- c(-171.738281, -56.601563)
ylim <- c(12.039321, 71.856229)

# Color settings
pal <- colorRampPalette(c("#333333", "white", "#1292db"))
colors <- pal(100)

map("world", col="#6B6363", fill=TRUE, bg="#000000", lwd=0.05, xlim=xlim, ylim=ylim)

maxcnt <- max(flights_aa$count)
for (j in 1:length(flights_aa$carrier)) {
  air1 <- airports[airports$iata == flights_aa[j,]$origin,]
  air2 <- airports[airports$iata == flights_aa[j,]$dest,]
  
  inter <- gcIntermediate(c(air1[1,]$longitude, air1[1,]$latitude), c(air2[1,]$longitude, air2[1,]$latitude), n=100, addStartEnd=TRUE)
  colindex <- round( (flights_aa[j,]$count / maxcnt) * length(colors) )
  
  lines(inter, col=colors[colindex], lwd=0.8)
}

Build a predictive model for delay with linear regression

We will build a predictive model with Spark MLlib. We use linear regression from MLlib.

First, we will prepare training data. In order to handle categorical data, you should use ft_string_indexer for converting.

# build predictive model with linear regression
partitions <- airlines %>%
  filter(arrdelay >= 5) %>%
  sdf_mutate(
       carrier_cat = ft_string_indexer(carrier),
       origin_cat = ft_string_indexer(origin),
       dest_cat = ft_string_indexer(dest)
  ) %>%
  mutate(hour = floor(dep_time/100)) %>%
  sdf_partition(training = 0.5, test = 0.5, seed = 1099)
fit <- partitions$training %>%
   ml_linear_regression(
     response = "arrdelay",
     features = c(
        "month", "hour", "dayofweek", "carrier_cat", "depdelay", "origin_cat", "dest_cat", "distance"
       )
    )
## * Dropped 44822 rows with 'na.omit' (22519745 => 22474923)
fit
## Call: ml_linear_regression(., response = "arrdelay", features = c("month", "hour", "dayofweek", "carrier_cat", "depdelay", "origin_cat", "dest_cat", "distance"))
## 
## Coefficients:
##  (Intercept)        month         hour    dayofweek  carrier_cat 
## 11.778682025 -0.046258441 -0.150788013 -0.235037195  0.147276224 
##     depdelay   origin_cat     dest_cat     distance 
##  0.903219105 -0.005712402 -0.023306845  0.001430704
summary(fit)
## Call: ml_linear_regression(., response = "arrdelay", features = c("month", "hour", "dayofweek", "carrier_cat", "depdelay", "origin_cat", "dest_cat", "distance"))
## 
## Deviance Residuals: (approximate):
##       Min        1Q    Median        3Q       Max 
## -1291.943    -7.856    -2.005     4.618   344.846 
## 
## Coefficients:
##                Estimate  Std. Error   t value  Pr(>|t|)    
## (Intercept)  1.1779e+01  1.6486e-02   714.468 < 2.2e-16 ***
## month       -4.6258e-02  9.8378e-04   -47.021 < 2.2e-16 ***
## hour        -1.5079e-01  7.4880e-04  -201.373 < 2.2e-16 ***
## dayofweek   -2.3504e-01  1.7646e-03  -133.195 < 2.2e-16 ***
## carrier_cat  1.4728e-01  7.2567e-04   202.951 < 2.2e-16 ***
## depdelay     9.0322e-01  8.7020e-05 10379.398 < 2.2e-16 ***
## origin_cat  -5.7124e-03  9.8910e-05   -57.753 < 2.2e-16 ***
## dest_cat    -2.3307e-02  9.4674e-05  -246.179 < 2.2e-16 ***
## distance     1.4307e-03  6.4905e-06   220.431 < 2.2e-16 ***
## ---
## Signif. codes:  0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1
## 
## R-Squared: 0.8333
## Root Mean Squared Error: 16.33

Now, we can see the trained linear regression model and its coefficients.

Summary

Using sparklyr enables you to analyze big data on Amazon S3 with R smoothly. You can build a Spark cluster easily with Cloudera Director. sparklyr makes Spark as a backend database of dplyr. You can create tidy data from huge messy data, plot complex maps from this big data the same way as small data, and build a predictive model from big data with MLlib. I believe sparklyr helps all R users perform exploratory data analysis faster and easier on large-scale data. Let’s try!

Learn more about sparklyr and Cloudera in this on-demand video.