Now let’s compare the results when we don’t do sharding.

Import csv and merge

library(disk.frame)

I chose 128 chunks because, 10 years of data, 12 months each (except 1987), roughly 120. Let’s round it to nearest 2^n.

Actually, what I intent to do was, chunk each data into arbitrary number but at the end, during rbindlist.disk.frame I should be saying bind to 128 chunks, but rbindlist.disk.frame didn’t have such option.

On the other hand, If I chose 12 chunks at csv_to_disk.frame stage then I might have force rbindlist.disk.frame merging data into 12 chunks. Thus I picked 128 at the beginning.

list.files("original_data", 
           pattern="*.csv", 
           full.names = T) %>% 
  head(10) %>%  # only 10 files so that we can test faster
  purrr::map(~ csv_to_disk.frame(.x, 
                                outdir=paste0("tmp", readr::parse_number(.x)),
                                nchunks = 128,
                                overwrite=T)) %>% 
  rbindlist.disk.frame(outdir = "flights_all_notsharded.df", by_chunk_id = T)

Now each (temporary) folder has 128 files:

$ find -iname "*.fst" | grep tmp | cut -d'/' -f2 | sort | uniq -c
    128 tmp1987
    128 tmp1988
    128 tmp1989
    128 tmp1990
    128 tmp1991
    128 tmp1992
    128 tmp1993
    128 tmp1994
    128 tmp1995
    128 tmp1996

Clean up directories for individual file imports (I wish there was no need for duplication of folders when rbindlist.disk.frame is used)

rm -r tmp????

Use the flight data

flights_df <- disk.frame("flights_all_notsharded.df/")
flights_df
## path: "flights_all_notsharded.df/"
## nchunks: 128
## nrow: 47925064
## ncol: 29

No missing files in merged folder

ls flights_all_notsharded.df/ | wc -l
## 128

two-stage group_by

by year

flights_df %>%
  group_by(UniqueCarrier) %>% # per chunk
  summarize(count = n()) %>%  
  collect %>%  
  group_by(UniqueCarrier) %>% # after collect
  summarize(count = sum(count)) %>% 
  arrange(UniqueCarrier) %>% 
  knitr::kable()  # somehow the output was problematic to print in html, thus used kable
UniqueCarrier count
AA 6655735
AS 994764
CO 4017862
DL 7884309
EA 919785
HP 1809250
ML (1) 70622
NW 4410734
PA (1) 316167
PI 873957
PS 83617
TW 2421955
UA 5938506
US 7295919
WN 4231882

Let’s compare with actual data and sharded data: (the current results is indicated with not_sharded)

UniqueCarrier disk.frame shard disk.frame not_sharded actual
AA 5263987 6655735 6655735
AS 795393 994764 994764
CO 3187682 4017862 4017862
DL 6247363 7884309 7884309
EA 701362 919785 919785
HP 1434364 1809250 1809250
ML (1) 56231 70622 70622
NW 3502394 4410734 4410734
PA (1) 240546 316167 316167
PI 672802 873957 873957
PS 70563 83617 83617
TW 1918283 2421955 2421955
UA 4731504 5938506 5938506
US 5752829 7295919 7295919
WN 3428952 4231882 4231882

by year

Let’s do counts per year this time.

flights_df %>%
  group_by(Year) %>% # per chunk
  summarize(count = n()) %>%  
  collect %>%  
  group_by(Year) %>% # after collect
  summarize(count = sum(count)) %>% 
  arrange(Year) %>% 
  knitr::kable()
Year count
1987 1311826
1988 5202096
1989 5041200
1990 5270893
1991 5076925
1992 5092157
1993 5070501
1994 5180048
1995 5327435
1996 5351983

Here’s combined table (note1: actual data has one extra line, the header line, per file. note2: the current results is indicated with not_sharded).

Year disk.frame shard disk.frame not_sharded actual
1987 1311826 1311826 1311827
1988 3427459 5202096 5202097
1989 4186900 5041200 5041201
1990 3936502 5270893 5270894
1991 3845285 5076925 5076926
1992 3404132 5092157 5092158
1993 4270194 5070501 5070502
1994 4318042 5180048 5180049
1995 4408509 5327435 5327436
1996 4895406 5351983 5351984
total gives error 47925064 (see below) 47925074

Actual counts have one additional line per file, thus the total has extra 10 lines extra, so there’s no discrepancy

per chunk operation

Let’s calculate total number of rows

delayed(flights_df, ~nrow(.x)) %>% collect_list() %>% purrr::reduce(`+`)
## [1] 47925064

Here’s the imap approach.. No problems..

disk.frame::imap.disk.frame(flights_df, ~nrow(.x), lazy = F) %>% purrr::reduce(`+`)
## [1] 47925064