If you’re interested in details, please start from the beginning of this document, otherwise skip this part and proceed directly to “All flight data” section.
The next chunk describes how data is downloaded and converted into zip file
mkdir original_data
cd original_data
for i in {1987..2008}; do wget -q http://stat-computing.org/dataexpo/2009/"$i".csv.bz2; done
for i in {1987..2008}; do bunzip2 "$i".csv.bz2; zip "$i".csv.zip "$i".csv; rm "$i".csv; done
library(disk.frame)
list.files("original_data",
pattern="*.csv",
full.names = T) %>%
head(10) %>% # only 10 files so that we can test faster
purrr::map(~ csv_to_disk.frame(.x,
outdir=paste0("tmp", readr::parse_number(.x)),
shardby = c("Year","Month"),
overwrite=T)) %>%
rbindlist.disk.frame(outdir = "flights_all_sharded.df", by_chunk_id = T)
Although the data is sharded by year and month, each folder contains 11 fst files, clash of hashing key shouldn’t be problem but it’s happening for some reason. Below, contents of 3 folders are shown. In 1990, in two chunks there are two months clustered. In 1993, 3 months are clusted in single fst file. And 1996 (and some other folders) have 12 fst files as expected.
ls -lh tmp199{0,3,6}
## tmp1990:
## total 142M
## -rw-r--r-- 1 alper alper 23M Jun 13 15:39 14.fst
## -rw-r--r-- 1 alper alper 25M Jun 13 15:39 16.fst
## -rw-r--r-- 1 alper alper 13M Jun 13 15:39 19.fst
## -rw-r--r-- 1 alper alper 13M Jun 13 15:39 20.fst
## -rw-r--r-- 1 alper alper 12M Jun 13 15:39 21.fst
## -rw-r--r-- 1 alper alper 12M Jun 13 15:39 44.fst
## -rw-r--r-- 1 alper alper 13M Jun 13 15:39 58.fst
## -rw-r--r-- 1 alper alper 12M Jun 13 15:39 60.fst
## -rw-r--r-- 1 alper alper 12M Jun 13 15:39 63.fst
## -rw-r--r-- 1 alper alper 12M Jun 13 15:39 7.fst
##
## tmp1993:
## total 138M
## -rw-r--r-- 1 alper alper 12M Jun 13 15:39 16.fst
## -rw-r--r-- 1 alper alper 35M Jun 13 15:39 17.fst
## -rw-r--r-- 1 alper alper 12M Jun 13 15:39 21.fst
## -rw-r--r-- 1 alper alper 12M Jun 13 15:39 25.fst
## -rw-r--r-- 1 alper alper 12M Jun 13 15:39 33.fst
## -rw-r--r-- 1 alper alper 12M Jun 13 15:39 37.fst
## -rw-r--r-- 1 alper alper 12M Jun 13 15:39 42.fst
## -rw-r--r-- 1 alper alper 11M Jun 13 15:39 60.fst
## -rw-r--r-- 1 alper alper 12M Jun 13 15:39 62.fst
## -rw-r--r-- 1 alper alper 12M Jun 13 15:39 7.fst
##
## tmp1996:
## total 187M
## -rw-r--r-- 1 alper alper 15M Jun 13 15:39 13.fst
## -rw-r--r-- 1 alper alper 16M Jun 13 15:39 14.fst
## -rw-r--r-- 1 alper alper 16M Jun 13 15:39 20.fst
## -rw-r--r-- 1 alper alper 16M Jun 13 15:39 22.fst
## -rw-r--r-- 1 alper alper 16M Jun 13 15:39 27.fst
## -rw-r--r-- 1 alper alper 16M Jun 13 15:39 2.fst
## -rw-r--r-- 1 alper alper 16M Jun 13 15:39 33.fst
## -rw-r--r-- 1 alper alper 16M Jun 13 15:39 36.fst
## -rw-r--r-- 1 alper alper 16M Jun 13 15:39 41.fst
## -rw-r--r-- 1 alper alper 16M Jun 13 15:39 4.fst
## -rw-r--r-- 1 alper alper 16M Jun 13 15:39 63.fst
## -rw-r--r-- 1 alper alper 16M Jun 13 15:39 8.fst
Clean up directories for individual file imports (I wish there was no need for duplication of folders when rbindlist.disk.frame is used)
rm -r tmp????
flights_df <- disk.frame("flights_all_sharded.df/")
flights_df
## path: "flights_all_sharded.df/"
## nchunks: 49
## nrow: 47925064
## ncol: 29
There are missing fst files in the directory. In this case 1, 5, 6, 31, 35, 38, 39, 40, 43, 47, 49, 50, 56, 59 are missing from the sequence.
list.files("flights_all_sharded.df/")
## [1] "10.fst" "11.fst" "12.fst" "13.fst" "14.fst" "15.fst" "16.fst"
## [8] "17.fst" "18.fst" "19.fst" "2.fst" "20.fst" "21.fst" "22.fst"
## [15] "23.fst" "24.fst" "25.fst" "26.fst" "27.fst" "28.fst" "29.fst"
## [22] "3.fst" "30.fst" "32.fst" "33.fst" "34.fst" "36.fst" "37.fst"
## [29] "4.fst" "41.fst" "42.fst" "44.fst" "45.fst" "46.fst" "48.fst"
## [36] "51.fst" "52.fst" "53.fst" "54.fst" "55.fst" "57.fst" "58.fst"
## [43] "60.fst" "61.fst" "62.fst" "63.fst" "7.fst" "8.fst" "9.fst"
flights_df %>%
group_by(UniqueCarrier) %>% # per chunk
summarize(count = n()) %>%
collect %>%
group_by(UniqueCarrier) %>% # after collect
summarize(count = sum(count)) %>%
arrange(UniqueCarrier) %>%
knitr::kable() # somehow the output was problematic to print in html, thus used kable
| UniqueCarrier | count |
|---|---|
| AA | 5263987 |
| AS | 795393 |
| CO | 3187682 |
| DL | 6247363 |
| EA | 701362 |
| HP | 1434364 |
| ML (1) | 56231 |
| NW | 3502394 |
| PA (1) | 240546 |
| PI | 672802 |
| PS | 70563 |
| TW | 1918283 |
| UA | 4731504 |
| US | 5752829 |
| WN | 3428952 |
Actual data is:
$ awk -F"," '{carrier[$9]++}END{for(carr in carrier){print carr,carrier[carr]}}' original_data/{1987..1996}.csv | sort
AA 6655735
AS 994764
CO 4017862
DL 7884309
EA 919785
HP 1809250
ML (1) 70622
NW 4410734
PA (1) 316167
PI 873957
PS 83617
TW 2421955
UA 5938506
UniqueCarrier 10
US 7295919
WN 4231882
There’s significant discrepancy between the results. Here’s both data in single table
| UniqueCarrier | disk.frame shard | actual |
|---|---|---|
| AA | 5263987 | 6655735 |
| AS | 795393 | 994764 |
| CO | 3187682 | 4017862 |
| DL | 6247363 | 7884309 |
| EA | 701362 | 919785 |
| HP | 1434364 | 1809250 |
| ML (1) | 56231 | 70622 |
| NW | 3502394 | 4410734 |
| PA (1) | 240546 | 316167 |
| PI | 672802 | 873957 |
| PS | 70563 | 83617 |
| TW | 1918283 | 2421955 |
| UA | 4731504 | 5938506 |
| US | 5752829 | 7295919 |
| WN | 3428952 | 4231882 |
Let’s do counts per year this time. The missing fst file sequence was mentioned above and that’s why I needed to mention it, during group_by operation there’s warning about missing fst files.
flights_df %>%
group_by(Year) %>% # per chunk
summarize(count = n()) %>%
collect %>%
group_by(Year) %>% # after collect
summarize(count = sum(count)) %>%
arrange(Year) %>%
knitr::kable()
## Warning in group_by.disk.frame(., Year): The shardkeys '' are NOT identical
## to shardby = 'Year'. The group_by operation is applied WITHIN each chunk,
## hence the results may not be as expected. To address this issue, you
## can rechunk(df, shardby = your_group_keys) which can be computationally
## expensive. Otherwise, you may use a second stage summary to obtain the
## desired result.
## Warning in get_chunk.disk.frame(df, .x): The chunk flights_all_sharded.df//
## 1.fst does not exist; returning an empty data.table
## Warning in get_chunk.disk.frame(df, .x): The chunk flights_all_sharded.df//
## 5.fst does not exist; returning an empty data.table
## Warning in get_chunk.disk.frame(df, .x): The chunk flights_all_sharded.df//
## 6.fst does not exist; returning an empty data.table
## Warning in get_chunk.disk.frame(df, .x): The chunk flights_all_sharded.df//
## 31.fst does not exist; returning an empty data.table
## Warning in get_chunk.disk.frame(df, .x): The chunk flights_all_sharded.df//
## 35.fst does not exist; returning an empty data.table
## Warning in get_chunk.disk.frame(df, .x): The chunk flights_all_sharded.df//
## 38.fst does not exist; returning an empty data.table
## Warning in get_chunk.disk.frame(df, .x): The chunk flights_all_sharded.df//
## 39.fst does not exist; returning an empty data.table
## Warning in get_chunk.disk.frame(df, .x): The chunk flights_all_sharded.df//
## 40.fst does not exist; returning an empty data.table
## Warning in get_chunk.disk.frame(df, .x): The chunk flights_all_sharded.df//
## 43.fst does not exist; returning an empty data.table
## Warning in get_chunk.disk.frame(df, .x): The chunk flights_all_sharded.df//
## 47.fst does not exist; returning an empty data.table
## Warning in get_chunk.disk.frame(df, .x): The chunk flights_all_sharded.df//
## 49.fst does not exist; returning an empty data.table
| Year | count |
|---|---|
| 1987 | 1311826 |
| 1988 | 3427459 |
| 1989 | 4186900 |
| 1990 | 3936502 |
| 1991 | 3845285 |
| 1992 | 3404132 |
| 1993 | 4270194 |
| 1994 | 4318042 |
| 1995 | 4408509 |
| 1996 | 4895406 |
Actual data by year
$ wc -l original_data/{1987..1996}.csv
1311827 original_data/1987.csv
5202097 original_data/1988.csv
5041201 original_data/1989.csv
5270894 original_data/1990.csv
5076926 original_data/1991.csv
5092158 original_data/1992.csv
5070502 original_data/1993.csv
5180049 original_data/1994.csv
5327436 original_data/1995.csv
5351984 original_data/1996.csv
47925074 total
There’s discrepancy again. Here’s combined table (actual data has one extra line, the header line, per file)
| Year | disk.frame shard | actual |
|---|---|---|
| 1987 | 1311826 | 1311827 |
| 1988 | 3427459 | 5202097 |
| 1989 | 4186900 | 5041201 |
| 1990 | 3936502 | 5270894 |
| 1991 | 3845285 | 5076926 |
| 1992 | 3404132 | 5092158 |
| 1993 | 4270194 | 5070502 |
| 1994 | 4318042 | 5180049 |
| 1995 | 4408509 | 5327436 |
| 1996 | 4895406 | 5351984 |
| total | see below | 47925074 |
The code below is expected to calculate total number of rows but it gives error, which is due to empty list elements (the missing fst files)
delayed(flights_df, ~nrow(.x)) %>% collect_list() %>% purrr::reduce(`+`)
Let’s demonstrate how missing fst files effect per chunk operation. Let’s use imap to calculate nrow per chunk and print the results for first 6 chunks.
disk.frame::imap.disk.frame(flights_df, ~nrow(.x), lazy = F) %>%
head()
## Warning in expand_(path, Sys.getenv("R_FS_HOME") != "" || is_windows()):
## '.Random.seed' is not an integer vector but of type 'NULL', so ignored
## Warning in get_chunk.disk.frame(.x, ii, keep = keep_future): The chunk
## flights_all_sharded.df//1.fst does not exist; returning an empty data.table
## Warning in get_chunk.disk.frame(.x, ii, keep = keep_future): The chunk
## flights_all_sharded.df//5.fst does not exist; returning an empty data.table
## Warning in get_chunk.disk.frame(.x, ii, keep = keep_future): The chunk
## flights_all_sharded.df//6.fst does not exist; returning an empty data.table
## Warning in expand_(path, Sys.getenv("R_FS_HOME") != "" || is_windows()):
## '.Random.seed' is not an integer vector but of type 'NULL', so ignored
## Warning in expand_(path, Sys.getenv("R_FS_HOME") != "" || is_windows()):
## '.Random.seed' is not an integer vector but of type 'NULL', so ignored
## Warning in expand_(path, Sys.getenv("R_FS_HOME") != "" || is_windows()):
## '.Random.seed' is not an integer vector but of type 'NULL', so ignored
## Warning in expand_(path, Sys.getenv("R_FS_HOME") != "" || is_windows()):
## '.Random.seed' is not an integer vector but of type 'NULL', so ignored
## Warning in get_chunk.disk.frame(.x, ii, keep = keep_future): The chunk
## flights_all_sharded.df//31.fst does not exist; returning an empty
## data.table
## Warning in expand_(path, Sys.getenv("R_FS_HOME") != "" || is_windows()):
## '.Random.seed' is not an integer vector but of type 'NULL', so ignored
## Warning in get_chunk.disk.frame(.x, ii, keep = keep_future): The chunk
## flights_all_sharded.df//35.fst does not exist; returning an empty
## data.table
## Warning in expand_(path, Sys.getenv("R_FS_HOME") != "" || is_windows()):
## '.Random.seed' is not an integer vector but of type 'NULL', so ignored
## Warning in get_chunk.disk.frame(.x, ii, keep = keep_future): The chunk
## flights_all_sharded.df//38.fst does not exist; returning an empty
## data.table
## Warning in get_chunk.disk.frame(.x, ii, keep = keep_future): The chunk
## flights_all_sharded.df//39.fst does not exist; returning an empty
## data.table
## Warning in get_chunk.disk.frame(.x, ii, keep = keep_future): The chunk
## flights_all_sharded.df//40.fst does not exist; returning an empty
## data.table
## Warning in get_chunk.disk.frame(.x, ii, keep = keep_future): The chunk
## flights_all_sharded.df//43.fst does not exist; returning an empty
## data.table
## Warning in expand_(path, Sys.getenv("R_FS_HOME") != "" || is_windows()):
## '.Random.seed' is not an integer vector but of type 'NULL', so ignored
## Warning in get_chunk.disk.frame(.x, ii, keep = keep_future): The chunk
## flights_all_sharded.df//47.fst does not exist; returning an empty
## data.table
## Warning in get_chunk.disk.frame(.x, ii, keep = keep_future): The chunk
## flights_all_sharded.df//49.fst does not exist; returning an empty
## data.table
## [[1]]
## [1] 0
##
## [[2]]
## [1] 1280908
##
## [[3]]
## [1] 405604
##
## [[4]]
## [1] 859163
##
## [[5]]
## [1] 0
##
## [[6]]
## [1] 0