If you’re interested in details, please start from the beginning of this document, otherwise skip this part and proceed directly to “All flight data” section.

Importing data

Download raw data

The next chunk describes how data is downloaded and converted into zip file

mkdir original_data
cd original_data

for i in {1987..2008}; do wget -q http://stat-computing.org/dataexpo/2009/"$i".csv.bz2; done

for i in {1987..2008}; do bunzip2 "$i".csv.bz2; zip "$i".csv.zip "$i".csv; rm "$i".csv; done

Import csv and merge

library(disk.frame)
list.files("original_data", 
           pattern="*.csv", 
           full.names = T) %>% 
  head(10) %>%  # only 10 files so that we can test faster
  purrr::map(~ csv_to_disk.frame(.x, 
                                outdir=paste0("tmp", readr::parse_number(.x)),
                                shardby = c("Year","Month"),
                                overwrite=T)) %>% 
  rbindlist.disk.frame(outdir = "flights_all_sharded.df", by_chunk_id = T)

Although the data is sharded by year and month, each folder contains 11 fst files, clash of hashing key shouldn’t be problem but it’s happening for some reason. Below, contents of 3 folders are shown. In 1990, in two chunks there are two months clustered. In 1993, 3 months are clusted in single fst file. And 1996 (and some other folders) have 12 fst files as expected.

ls -lh tmp199{0,3,6}
## tmp1990:
## total 142M
## -rw-r--r-- 1 alper alper 23M Jun 13 15:39 14.fst
## -rw-r--r-- 1 alper alper 25M Jun 13 15:39 16.fst
## -rw-r--r-- 1 alper alper 13M Jun 13 15:39 19.fst
## -rw-r--r-- 1 alper alper 13M Jun 13 15:39 20.fst
## -rw-r--r-- 1 alper alper 12M Jun 13 15:39 21.fst
## -rw-r--r-- 1 alper alper 12M Jun 13 15:39 44.fst
## -rw-r--r-- 1 alper alper 13M Jun 13 15:39 58.fst
## -rw-r--r-- 1 alper alper 12M Jun 13 15:39 60.fst
## -rw-r--r-- 1 alper alper 12M Jun 13 15:39 63.fst
## -rw-r--r-- 1 alper alper 12M Jun 13 15:39 7.fst
## 
## tmp1993:
## total 138M
## -rw-r--r-- 1 alper alper 12M Jun 13 15:39 16.fst
## -rw-r--r-- 1 alper alper 35M Jun 13 15:39 17.fst
## -rw-r--r-- 1 alper alper 12M Jun 13 15:39 21.fst
## -rw-r--r-- 1 alper alper 12M Jun 13 15:39 25.fst
## -rw-r--r-- 1 alper alper 12M Jun 13 15:39 33.fst
## -rw-r--r-- 1 alper alper 12M Jun 13 15:39 37.fst
## -rw-r--r-- 1 alper alper 12M Jun 13 15:39 42.fst
## -rw-r--r-- 1 alper alper 11M Jun 13 15:39 60.fst
## -rw-r--r-- 1 alper alper 12M Jun 13 15:39 62.fst
## -rw-r--r-- 1 alper alper 12M Jun 13 15:39 7.fst
## 
## tmp1996:
## total 187M
## -rw-r--r-- 1 alper alper 15M Jun 13 15:39 13.fst
## -rw-r--r-- 1 alper alper 16M Jun 13 15:39 14.fst
## -rw-r--r-- 1 alper alper 16M Jun 13 15:39 20.fst
## -rw-r--r-- 1 alper alper 16M Jun 13 15:39 22.fst
## -rw-r--r-- 1 alper alper 16M Jun 13 15:39 27.fst
## -rw-r--r-- 1 alper alper 16M Jun 13 15:39 2.fst
## -rw-r--r-- 1 alper alper 16M Jun 13 15:39 33.fst
## -rw-r--r-- 1 alper alper 16M Jun 13 15:39 36.fst
## -rw-r--r-- 1 alper alper 16M Jun 13 15:39 41.fst
## -rw-r--r-- 1 alper alper 16M Jun 13 15:39 4.fst
## -rw-r--r-- 1 alper alper 16M Jun 13 15:39 63.fst
## -rw-r--r-- 1 alper alper 16M Jun 13 15:39 8.fst

Clean up directories for individual file imports (I wish there was no need for duplication of folders when rbindlist.disk.frame is used)

rm -r tmp????

Use the flight data

flights_df <- disk.frame("flights_all_sharded.df/")
flights_df
## path: "flights_all_sharded.df/"
## nchunks: 49
## nrow: 47925064
## ncol: 29

There are missing fst files in the directory. In this case 1, 5, 6, 31, 35, 38, 39, 40, 43, 47, 49, 50, 56, 59 are missing from the sequence.

list.files("flights_all_sharded.df/")
##  [1] "10.fst" "11.fst" "12.fst" "13.fst" "14.fst" "15.fst" "16.fst"
##  [8] "17.fst" "18.fst" "19.fst" "2.fst"  "20.fst" "21.fst" "22.fst"
## [15] "23.fst" "24.fst" "25.fst" "26.fst" "27.fst" "28.fst" "29.fst"
## [22] "3.fst"  "30.fst" "32.fst" "33.fst" "34.fst" "36.fst" "37.fst"
## [29] "4.fst"  "41.fst" "42.fst" "44.fst" "45.fst" "46.fst" "48.fst"
## [36] "51.fst" "52.fst" "53.fst" "54.fst" "55.fst" "57.fst" "58.fst"
## [43] "60.fst" "61.fst" "62.fst" "63.fst" "7.fst"  "8.fst"  "9.fst"

two-stage group_by

by year

flights_df %>%
  group_by(UniqueCarrier) %>% # per chunk
  summarize(count = n()) %>%  
  collect %>%  
  group_by(UniqueCarrier) %>% # after collect
  summarize(count = sum(count)) %>% 
  arrange(UniqueCarrier) %>% 
  knitr::kable()  # somehow the output was problematic to print in html, thus used kable
UniqueCarrier count
AA 5263987
AS 795393
CO 3187682
DL 6247363
EA 701362
HP 1434364
ML (1) 56231
NW 3502394
PA (1) 240546
PI 672802
PS 70563
TW 1918283
UA 4731504
US 5752829
WN 3428952

Actual data is:

$ awk -F"," '{carrier[$9]++}END{for(carr in carrier){print carr,carrier[carr]}}' original_data/{1987..1996}.csv | sort

AA 6655735
AS 994764
CO 4017862
DL 7884309
EA 919785
HP 1809250
ML (1) 70622
NW 4410734
PA (1) 316167
PI 873957
PS 83617
TW 2421955
UA 5938506
UniqueCarrier 10
US 7295919
WN 4231882

There’s significant discrepancy between the results. Here’s both data in single table

UniqueCarrier disk.frame shard actual
AA 5263987 6655735
AS 795393 994764
CO 3187682 4017862
DL 6247363 7884309
EA 701362 919785
HP 1434364 1809250
ML (1) 56231 70622
NW 3502394 4410734
PA (1) 240546 316167
PI 672802 873957
PS 70563 83617
TW 1918283 2421955
UA 4731504 5938506
US 5752829 7295919
WN 3428952 4231882

by year

Let’s do counts per year this time. The missing fst file sequence was mentioned above and that’s why I needed to mention it, during group_by operation there’s warning about missing fst files.

flights_df %>%
  group_by(Year) %>% # per chunk
  summarize(count = n()) %>%  
  collect %>%  
  group_by(Year) %>% # after collect
  summarize(count = sum(count)) %>% 
  arrange(Year) %>% 
  knitr::kable()
## Warning in group_by.disk.frame(., Year): The shardkeys '' are NOT identical
## to shardby = 'Year'. The group_by operation is applied WITHIN each chunk,
## hence the results may not be as expected. To address this issue, you
## can rechunk(df, shardby = your_group_keys) which can be computationally
## expensive. Otherwise, you may use a second stage summary to obtain the
## desired result.
## Warning in get_chunk.disk.frame(df, .x): The chunk flights_all_sharded.df//
## 1.fst does not exist; returning an empty data.table
## Warning in get_chunk.disk.frame(df, .x): The chunk flights_all_sharded.df//
## 5.fst does not exist; returning an empty data.table
## Warning in get_chunk.disk.frame(df, .x): The chunk flights_all_sharded.df//
## 6.fst does not exist; returning an empty data.table
## Warning in get_chunk.disk.frame(df, .x): The chunk flights_all_sharded.df//
## 31.fst does not exist; returning an empty data.table
## Warning in get_chunk.disk.frame(df, .x): The chunk flights_all_sharded.df//
## 35.fst does not exist; returning an empty data.table
## Warning in get_chunk.disk.frame(df, .x): The chunk flights_all_sharded.df//
## 38.fst does not exist; returning an empty data.table
## Warning in get_chunk.disk.frame(df, .x): The chunk flights_all_sharded.df//
## 39.fst does not exist; returning an empty data.table
## Warning in get_chunk.disk.frame(df, .x): The chunk flights_all_sharded.df//
## 40.fst does not exist; returning an empty data.table
## Warning in get_chunk.disk.frame(df, .x): The chunk flights_all_sharded.df//
## 43.fst does not exist; returning an empty data.table
## Warning in get_chunk.disk.frame(df, .x): The chunk flights_all_sharded.df//
## 47.fst does not exist; returning an empty data.table
## Warning in get_chunk.disk.frame(df, .x): The chunk flights_all_sharded.df//
## 49.fst does not exist; returning an empty data.table
Year count
1987 1311826
1988 3427459
1989 4186900
1990 3936502
1991 3845285
1992 3404132
1993 4270194
1994 4318042
1995 4408509
1996 4895406

Actual data by year

$ wc -l original_data/{1987..1996}.csv
   1311827 original_data/1987.csv
   5202097 original_data/1988.csv
   5041201 original_data/1989.csv
   5270894 original_data/1990.csv
   5076926 original_data/1991.csv
   5092158 original_data/1992.csv
   5070502 original_data/1993.csv
   5180049 original_data/1994.csv
   5327436 original_data/1995.csv
   5351984 original_data/1996.csv
  47925074 total

There’s discrepancy again. Here’s combined table (actual data has one extra line, the header line, per file)

Year disk.frame shard actual
1987 1311826 1311827
1988 3427459 5202097
1989 4186900 5041201
1990 3936502 5270894
1991 3845285 5076926
1992 3404132 5092158
1993 4270194 5070502
1994 4318042 5180049
1995 4408509 5327436
1996 4895406 5351984
total see below 47925074

per chunk operation

The code below is expected to calculate total number of rows but it gives error, which is due to empty list elements (the missing fst files)

delayed(flights_df, ~nrow(.x)) %>% collect_list() %>% purrr::reduce(`+`)

Let’s demonstrate how missing fst files effect per chunk operation. Let’s use imap to calculate nrow per chunk and print the results for first 6 chunks.

disk.frame::imap.disk.frame(flights_df, ~nrow(.x), lazy = F) %>% 
  head()
## Warning in expand_(path, Sys.getenv("R_FS_HOME") != "" || is_windows()):
## '.Random.seed' is not an integer vector but of type 'NULL', so ignored
## Warning in get_chunk.disk.frame(.x, ii, keep = keep_future): The chunk
## flights_all_sharded.df//1.fst does not exist; returning an empty data.table
## Warning in get_chunk.disk.frame(.x, ii, keep = keep_future): The chunk
## flights_all_sharded.df//5.fst does not exist; returning an empty data.table
## Warning in get_chunk.disk.frame(.x, ii, keep = keep_future): The chunk
## flights_all_sharded.df//6.fst does not exist; returning an empty data.table
## Warning in expand_(path, Sys.getenv("R_FS_HOME") != "" || is_windows()):
## '.Random.seed' is not an integer vector but of type 'NULL', so ignored

## Warning in expand_(path, Sys.getenv("R_FS_HOME") != "" || is_windows()):
## '.Random.seed' is not an integer vector but of type 'NULL', so ignored

## Warning in expand_(path, Sys.getenv("R_FS_HOME") != "" || is_windows()):
## '.Random.seed' is not an integer vector but of type 'NULL', so ignored

## Warning in expand_(path, Sys.getenv("R_FS_HOME") != "" || is_windows()):
## '.Random.seed' is not an integer vector but of type 'NULL', so ignored
## Warning in get_chunk.disk.frame(.x, ii, keep = keep_future): The chunk
## flights_all_sharded.df//31.fst does not exist; returning an empty
## data.table
## Warning in expand_(path, Sys.getenv("R_FS_HOME") != "" || is_windows()):
## '.Random.seed' is not an integer vector but of type 'NULL', so ignored
## Warning in get_chunk.disk.frame(.x, ii, keep = keep_future): The chunk
## flights_all_sharded.df//35.fst does not exist; returning an empty
## data.table
## Warning in expand_(path, Sys.getenv("R_FS_HOME") != "" || is_windows()):
## '.Random.seed' is not an integer vector but of type 'NULL', so ignored
## Warning in get_chunk.disk.frame(.x, ii, keep = keep_future): The chunk
## flights_all_sharded.df//38.fst does not exist; returning an empty
## data.table
## Warning in get_chunk.disk.frame(.x, ii, keep = keep_future): The chunk
## flights_all_sharded.df//39.fst does not exist; returning an empty
## data.table
## Warning in get_chunk.disk.frame(.x, ii, keep = keep_future): The chunk
## flights_all_sharded.df//40.fst does not exist; returning an empty
## data.table
## Warning in get_chunk.disk.frame(.x, ii, keep = keep_future): The chunk
## flights_all_sharded.df//43.fst does not exist; returning an empty
## data.table
## Warning in expand_(path, Sys.getenv("R_FS_HOME") != "" || is_windows()):
## '.Random.seed' is not an integer vector but of type 'NULL', so ignored
## Warning in get_chunk.disk.frame(.x, ii, keep = keep_future): The chunk
## flights_all_sharded.df//47.fst does not exist; returning an empty
## data.table
## Warning in get_chunk.disk.frame(.x, ii, keep = keep_future): The chunk
## flights_all_sharded.df//49.fst does not exist; returning an empty
## data.table
## [[1]]
## [1] 0
## 
## [[2]]
## [1] 1280908
## 
## [[3]]
## [1] 405604
## 
## [[4]]
## [1] 859163
## 
## [[5]]
## [1] 0
## 
## [[6]]
## [1] 0