RからSparkのデータを処理するsparklyr

今回は、sparklyrを使ってアメリカのフライト情報について、可視化、予測モデルの構築を行います。

地図の可視化はこちらのブログを参考にしました。 http://flowingdata.com/2011/05/11/how-to-map-connections-with-great-circles/

もし、sparklyrに興味をもったなら、公式ドキュメントから始めるといいでしょう。 もしくは、Cloudera DirectorでSparkクラスターを簡単につくり、それとsparklyrをつなげても良いでしょう。(今回はこれを元にしています) Cloudera Blog.

また、sparklyrを使う上で、チートシートが非常に役に立ちます。

このドキュメントでは以下のテーブルを使います:

これらのテーブルはHueなどを使って、別途作成しておいてください。テーブル作成のためのSQLはこちらを参考にしてください。

まず、先程立ち上げたクラスタのゲートウェイサーバにRStudio Serverが入っています。ポートが開放されている場合は、<gateway-server-host>:8787でブラウザからアクセスできます。 お手持ちのブラウザで<gateway-server-host>:8787を開いてください。そうでなければ8787番のポートをsshなどでフォワードし、localhost:8787にアクセスします。ブラウザからRStudio Serverにアクセスできます。スクリプトの設定だと、ユーザ名rsuserパスワードclouderaでRStudioにログインできます。

Sparkに接続する

sparklyr を使いApache Sparkクラスタに接続します。今回のコードは別途入れておいたSpark 2.0を使っています。

# Load libraries
library(ggplot2)
library(maps)
library(geosphere)
## Loading required package: sp
library(sparklyr)
library(dplyr)
## 
## Attaching package: 'dplyr'
## The following objects are masked from 'package:stats':
## 
##     filter, lag
## The following objects are masked from 'package:base':
## 
##     intersect, setdiff, setequal, union
# Configure cluster
config <- spark_config()
config$spark.driver.cores   <- 4
config$spark.executor.cores <- 4
config$spark.executor.memory <- "4G"
#spark_home <- "/opt/cloudera/parcels/CDH/lib/spark"
#spark_version <- "1.6.2"
spark_home <- "/opt/cloudera/parcels/SPARK2/lib/spark2"
spark_version <- "2.0.0"
sc <- spark_connect(master="yarn-client", version=spark_version, config=config, spark_home=spark_home)

S3にあるテーブルのデータを読み込みプロットする

まずは、airlines_bi_pqというHiveのテーブルにあるフライト数を年毎に集計します。 元データは、こちらからダウンロードできます。

airlines <- tbl(sc, "airlines_bi_pq")
airlines
## Source:   query [1.235e+08 x 30]
## Database: spark connection master=yarn-client app=sparklyr local=FALSE
## 
##     year month   day dayofweek dep_time crs_dep_time arr_time crs_arr_time
##    <int> <int> <int>     <int>    <int>        <int>    <int>        <int>
## 1   2006     6     3         6      840          830     1006         1007
## 2   2006     6     4         7      830          830      958         1007
## 3   2006     6     5         1      827          830     1004         1007
## 4   2006     6     6         2      830          830     1006         1007
## 5   2006     6     7         3      831          830     1005         1007
## 6   2006     6     8         4      826          830      958         1007
## 7   2006     6     9         5      826          830      958         1007
## 8   2006     6    10         6      845          830     1011         1007
## 9   2006     6    11         7      828          830      950         1007
## 10  2006     6    12         1      829          830      956         1007
## # ... with 1.235e+08 more rows, and 22 more variables: carrier <chr>,
## #   flight_num <int>, tail_num <int>, actual_elapsed_time <int>,
## #   crs_elapsed_time <int>, airtime <int>, arrdelay <int>, depdelay <int>,
## #   origin <chr>, dest <chr>, distance <int>, taxi_in <int>,
## #   taxi_out <int>, cancelled <int>, cancellation_code <chr>,
## #   diverted <int>, carrier_delay <int>, weather_delay <int>,
## #   nas_delay <int>, security_delay <int>, late_aircraft_delay <int>,
## #   date_yyyymm <chr>
airline_counts_by_year <- airlines %>% group_by(year) %>% summarise(count=n()) %>% collect
airline_counts_by_year %>% tbl_df %>% print(n=nrow(.))
## # A tibble: 22 × 2
##     year   count
##    <int>   <dbl>
## 1   1990 5270893
## 2   2003 6488540
## 3   2007 7453215
## 4   2006 7141922
## 5   1988 5202096
## 6   1997 5411843
## 7   1994 5180048
## 8   2004 7129270
## 9   1991 5076925
## 10  1996 5351983
## 11  1989 5041200
## 12  1998 5384721
## 13  1987 1311826
## 14  1995 5327435
## 15  2001 5967780
## 16  1992 5092157
## 17  2005 7140596
## 18  2000 5683047
## 19  2008 7009728
## 20  1999 5527884
## 21  2002 5271359
## 22  1993 5070501

これをグラフにプロットしてみましょう。

g <- ggplot(airline_counts_by_year, aes(x=year, y=count))
g <- g + geom_line(
  colour = "magenta",
  linetype = 1,
  size = 0.8
)
g <- g + xlab("Year")
g <- g + ylab("Flight number")
g <- g + ggtitle("US flights")
plot(g)

2001-2003年のフライト数を見てみる

2002年のフライト数が激減しているのがわかります。何が起こったのでしょうか?2001年〜2003年のフライト数を見てみましょう。

airline_counts_by_month <- airlines %>% filter(year>= 2001 & year<=2003) %>% group_by(year, month) %>% summarise(count=n()) %>% collect

g <- ggplot(
  airline_counts_by_month, 
  aes(x=as.Date(sprintf("%d-%02d-01", airline_counts_by_month$year, airline_counts_by_month$month)), y=count)
  )
g <- g + geom_line(
  colour = "magenta",
  linetype = 1,
  size = 0.8
)
g <- g + xlab("Year/Month")
g <- g + ylab("Flight number")
g <- g + ggtitle("US flights")
plot(g)

2001年の9月以降、フライト数が激減しているのがグラフから読み取れます。これは、9.11の影響を受けて以降2002年の間、フライト数が減少していると考えられます。この記事ではこれ以上原因については深く追求はしませんが、このように探索的な分析を行うことで、普段と異なるデータの傾向から、何が起こったのかを紐解くことができます。

フライトデータを年、キャリア、出発地、到着地で集計する

次に、キャリア・年ごとのフライト数を集計してみます。

flights <- airlines %>% group_by(year, carrier, origin, dest) %>% summarise(count=n()) %>% collect
flights
## Source: local data frame [123,737 x 5]
## Groups: year, carrier, origin [24,636]
## 
##     year carrier origin  dest count
##    <int>   <chr>  <chr> <chr> <dbl>
## 1   2006      US    TPA   CLT  2754
## 2   2006      DL    ATL   GSP  1206
## 3   2006      DL    RSW   ATL  2294
## 4   1998      NW    DTW   SDF  1117
## 5   2006      DL    LAX   CVG  1296
## 6   2006      DL    BOS   SLC   710
## 7   2006      DL    SJU   JFK   571
## 8   2000      DL    ATL   PWM   727
## 9   1988      DL    GEG   SLC  1051
## 10  2006      OH    BDL   RSW   351
## # ... with 123,727 more rows
airports <- tbl(sc, "airports_new_pq") %>% collect

その中から、2007年のアメリカン航空のフライト情報抽出します。

flights_aa <- flights %>% filter(year==2007) %>% filter(carrier=="AA") %>% arrange(count)
flights_aa
## Source: local data frame [460 x 5]
## Groups: year, carrier, origin [82]
## 
##     year carrier origin  dest count
##    <int>   <chr>  <chr> <chr> <dbl>
## 1   2007      AA    BOS   JFK     1
## 2   2007      AA    MIA   FLL     1
## 3   2007      AA    BOS   SDF     1
## 4   2007      AA    MIA   XNA     1
## 5   2007      AA    BNA   IAD     1
## 6   2007      AA    XNA   MIA     2
## 7   2007      AA    OGG   ORD     8
## 8   2007      AA    ORD   OGG     8
## 9   2007      AA    EGE   LGA    17
## 10  2007      AA    MTJ   ORD    17
## # ... with 450 more rows

アメリカン航空のフライトのマップを出力する

では、2007年のアメリカン航空のフライトを地図上に可視化してみましょう。フィルターするデータを変えれば他の航空会社でもプロット可能です。

# draw map with line of AA
xlim <- c(-171.738281, -56.601563)
ylim <- c(12.039321, 71.856229)

# Color settings
#pal <- colorRampPalette(c("#f2f2f2", "red"))
pal <- colorRampPalette(c("#333333", "white", "#1292db"))
colors <- pal(100)

#map("world", col="#f2f2f2", fill=TRUE, bg="white", lwd=0.05, xlim=xlim, ylim=ylim)
map("world", col="#6B6363", fill=TRUE, bg="#000000", lwd=0.05, xlim=xlim, ylim=ylim)

#carriers <- unique(flights$carrier)

maxcnt <- max(flights_aa$count)
for (j in 1:length(flights_aa$carrier)) {
  air1 <- airports[airports$iata == flights_aa[j,]$origin,]
  air2 <- airports[airports$iata == flights_aa[j,]$dest,]
  
  inter <- gcIntermediate(c(air1[1,]$longitude, air1[1,]$latitude), c(air2[1,]$longitude, air2[1,]$latitude), n=100, addStartEnd=TRUE)
  colindex <- round( (flights_aa[j,]$count / maxcnt) * length(colors) )
  
  lines(inter, col=colors[colindex], lwd=0.8)
}

遅延時間を予測する線形回帰モデルを学習する

最後に、MLlibの線形回帰モデルを使い、遅延を予測するモデルを構築してみましょう。

なお、sparklyrでカテゴリカルモデルを扱う際には、ft_string_indexerを使い変換しておきます。これを用いることで、文字列のデータをインデックスに変換します。

# build predictive model with linear regression
partitions <- airlines %>%
  filter(arrdelay >= 5) %>%
  sdf_mutate(
       carrier_cat = ft_string_indexer(carrier),
       origin_cat = ft_string_indexer(origin),
       dest_cat = ft_string_indexer(dest)
  ) %>%
  mutate(hour = floor(dep_time/100)) %>%
  sdf_partition(training = 0.5, test = 0.5, seed = 1099)
fit <- partitions$training %>%
   ml_linear_regression(
     response = "arrdelay",
     features = c(
        "month", "hour", "dayofweek", "carrier_cat", "depdelay", "origin_cat", "dest_cat", "distance"
       )
    )
## * Dropped 44822 rows with 'na.omit' (22519745 => 22474923)
fit
## Call: ml_linear_regression(., response = "arrdelay", features = c("month", "hour", "dayofweek", "carrier_cat", "depdelay", "origin_cat", "dest_cat", "distance"))
## 
## Coefficients:
##  (Intercept)        month         hour    dayofweek  carrier_cat 
## 11.778682025 -0.046258441 -0.150788013 -0.235037195  0.147276224 
##     depdelay   origin_cat     dest_cat     distance 
##  0.903219105 -0.005712402 -0.023306845  0.001430704
summary(fit)
## Call: ml_linear_regression(., response = "arrdelay", features = c("month", "hour", "dayofweek", "carrier_cat", "depdelay", "origin_cat", "dest_cat", "distance"))
## 
## Deviance Residuals: (approximate):
##       Min        1Q    Median        3Q       Max 
## -1195.316    -7.940    -2.055     4.552   652.969 
## 
## Coefficients:
##                Estimate  Std. Error   t value  Pr(>|t|)    
## (Intercept)  1.1779e+01  1.6486e-02   714.468 < 2.2e-16 ***
## month       -4.6258e-02  9.8378e-04   -47.021 < 2.2e-16 ***
## hour        -1.5079e-01  7.4880e-04  -201.373 < 2.2e-16 ***
## dayofweek   -2.3504e-01  1.7646e-03  -133.195 < 2.2e-16 ***
## carrier_cat  1.4728e-01  7.2567e-04   202.951 < 2.2e-16 ***
## depdelay     9.0322e-01  8.7020e-05 10379.398 < 2.2e-16 ***
## origin_cat  -5.7124e-03  9.8910e-05   -57.753 < 2.2e-16 ***
## dest_cat    -2.3307e-02  9.4674e-05  -246.179 < 2.2e-16 ***
## distance     1.4307e-03  6.4905e-06   220.431 < 2.2e-16 ***
## ---
## Signif. codes:  0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1
## 
## R-Squared: 0.8333
## Root Mean Squared Error: 16.33

このように、線形回帰モデルが学習され、どの特徴量がどういった係数か、ということがわかります。

まとめ

この記事では、sparklyrを使ってAmazon S3にあるアメリカの航空データの可視化、および予測モデルの構築を行いました。sparklyrを使うことでRに馴染み深い方にはいつもと同じ感覚でS3のデータに対して分析を行うことができます。また、Cloudera Directorを使うことで簡単にSparkクラスタを構築することができるので、是非試してみてください。