Now let’s compare the results when we don’t do sharding.
library(disk.frame)
I chose 128 chunks because, 10 years of data, 12 months each (except 1987), roughly 120. Let’s round it to nearest 2^n.
Actually, what I intent to do was, chunk each data into arbitrary number but at the end, during rbindlist.disk.frame
I should be saying bind to 128 chunks, but rbindlist.disk.frame
didn’t have such option.
On the other hand, If I chose 12 chunks at csv_to_disk.frame
stage then I might have force rbindlist.disk.frame
merging data into 12 chunks. Thus I picked 128 at the beginning.
list.files("original_data",
pattern="*.csv",
full.names = T) %>%
head(10) %>% # only 10 files so that we can test faster
purrr::map(~ csv_to_disk.frame(.x,
outdir=paste0("tmp", readr::parse_number(.x)),
nchunks = 128,
overwrite=T)) %>%
rbindlist.disk.frame(outdir = "flights_all_notsharded.df", by_chunk_id = T)
Now each (temporary) folder has 128 files:
$ find -iname "*.fst" | grep tmp | cut -d'/' -f2 | sort | uniq -c
128 tmp1987
128 tmp1988
128 tmp1989
128 tmp1990
128 tmp1991
128 tmp1992
128 tmp1993
128 tmp1994
128 tmp1995
128 tmp1996
Clean up directories for individual file imports (I wish there was no need for duplication of folders when rbindlist.disk.frame
is used)
rm -r tmp????
flights_df <- disk.frame("flights_all_notsharded.df/")
flights_df
## path: "flights_all_notsharded.df/"
## nchunks: 128
## nrow: 47925064
## ncol: 29
No missing files in merged folder
ls flights_all_notsharded.df/ | wc -l
## 128
flights_df %>%
group_by(UniqueCarrier) %>% # per chunk
summarize(count = n()) %>%
collect %>%
group_by(UniqueCarrier) %>% # after collect
summarize(count = sum(count)) %>%
arrange(UniqueCarrier) %>%
knitr::kable() # somehow the output was problematic to print in html, thus used kable
UniqueCarrier | count |
---|---|
AA | 6655735 |
AS | 994764 |
CO | 4017862 |
DL | 7884309 |
EA | 919785 |
HP | 1809250 |
ML (1) | 70622 |
NW | 4410734 |
PA (1) | 316167 |
PI | 873957 |
PS | 83617 |
TW | 2421955 |
UA | 5938506 |
US | 7295919 |
WN | 4231882 |
Let’s compare with actual data and sharded data: (the current results is indicated with not_sharded)
UniqueCarrier | disk.frame shard | disk.frame not_sharded | actual |
---|---|---|---|
AA | 5263987 | 6655735 | 6655735 |
AS | 795393 | 994764 | 994764 |
CO | 3187682 | 4017862 | 4017862 |
DL | 6247363 | 7884309 | 7884309 |
EA | 701362 | 919785 | 919785 |
HP | 1434364 | 1809250 | 1809250 |
ML (1) | 56231 | 70622 | 70622 |
NW | 3502394 | 4410734 | 4410734 |
PA (1) | 240546 | 316167 | 316167 |
PI | 672802 | 873957 | 873957 |
PS | 70563 | 83617 | 83617 |
TW | 1918283 | 2421955 | 2421955 |
UA | 4731504 | 5938506 | 5938506 |
US | 5752829 | 7295919 | 7295919 |
WN | 3428952 | 4231882 | 4231882 |
Let’s do counts per year this time.
flights_df %>%
group_by(Year) %>% # per chunk
summarize(count = n()) %>%
collect %>%
group_by(Year) %>% # after collect
summarize(count = sum(count)) %>%
arrange(Year) %>%
knitr::kable()
Year | count |
---|---|
1987 | 1311826 |
1988 | 5202096 |
1989 | 5041200 |
1990 | 5270893 |
1991 | 5076925 |
1992 | 5092157 |
1993 | 5070501 |
1994 | 5180048 |
1995 | 5327435 |
1996 | 5351983 |
Here’s combined table (note1: actual data has one extra line, the header line, per file. note2: the current results is indicated with not_sharded).
Year | disk.frame shard | disk.frame not_sharded | actual |
---|---|---|---|
1987 | 1311826 | 1311826 | 1311827 |
1988 | 3427459 | 5202096 | 5202097 |
1989 | 4186900 | 5041200 | 5041201 |
1990 | 3936502 | 5270893 | 5270894 |
1991 | 3845285 | 5076925 | 5076926 |
1992 | 3404132 | 5092157 | 5092158 |
1993 | 4270194 | 5070501 | 5070502 |
1994 | 4318042 | 5180048 | 5180049 |
1995 | 4408509 | 5327435 | 5327436 |
1996 | 4895406 | 5351983 | 5351984 |
total | gives error | 47925064 (see below) | 47925074 |
Actual counts have one additional line per file, thus the total has extra 10 lines extra, so there’s no discrepancy
Let’s calculate total number of rows
delayed(flights_df, ~nrow(.x)) %>% collect_list() %>% purrr::reduce(`+`)
## [1] 47925064
Here’s the imap
approach.. No problems..
disk.frame::imap.disk.frame(flights_df, ~nrow(.x), lazy = F) %>% purrr::reduce(`+`)
## [1] 47925064