Arrow

Harold Nelson

2023-03-26

Setup

library(tidyverse)
## ── Attaching packages ─────────────────────────────────────── tidyverse 1.3.2 ──
## ✔ ggplot2 3.4.1     ✔ purrr   0.3.4
## ✔ tibble  3.2.1     ✔ dplyr   1.1.1
## ✔ tidyr   1.2.1     ✔ stringr 1.4.1
## ✔ readr   2.1.2     ✔ forcats 0.5.2
## ── Conflicts ────────────────────────────────────────── tidyverse_conflicts() ──
## ✖ dplyr::filter() masks stats::filter()
## ✖ dplyr::lag()    masks stats::lag()
library(arrow)
## 
## Attaching package: 'arrow'
## 
## The following object is masked from 'package:utils':
## 
##     timestamp
library(dbplyr, warn.conflicts = FALSE)
library(tictoc)

Get Data

dir.create("data", showWarnings = FALSE)

curl::multi_download(
  "https://r4ds.s3.us-west-2.amazonaws.com/seattle-library-checkouts.csv",
  "data/seattle-library-checkouts.csv"
)

Open with Arrow

This is not the original code from the text.

opts <- CsvConvertOptions$create(col_types = schema(ISBN = string())) 
  
 seattle_csv <- open_dataset( 
   sources = "data/seattle-library-checkouts.csv",  
   format = "csv", 
   convert_options = opts 
 ) 

Glimpse

glimpse(seattle_csv)
## FileSystemDataset with 1 csv file
## 41,389,465 rows x 12 columns
## $ UsageClass      <string> "Physical", "Physical", "Digital", "Physical", "Physi…
## $ CheckoutType    <string> "Horizon", "Horizon", "OverDrive", "Horizon", "Horizo…
## $ MaterialType    <string> "BOOK", "BOOK", "EBOOK", "BOOK", "SOUNDDISC", "BOOK",…
## $ CheckoutYear     <int64> 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016,…
## $ CheckoutMonth    <int64> 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6,…
## $ Checkouts        <int64> 1, 1, 1, 1, 1, 1, 1, 1, 4, 1, 1, 2, 3, 2, 1, 3, 2, 3,…
## $ Title           <string> "Super rich : a guide to having it all / Russell Simm…
## $ ISBN            <string> "", "", "", "", "", "", "", "", "", "", "", "", "", "…
## $ Creator         <string> "Simmons, Russell", "Barclay, James, 1965-", "Tim Par…
## $ Subjects        <string> "Self realization, Conduct of life, Attitude Psycholo…
## $ Publisher       <string> "Gotham Books,", "Pyr,", "Random House, Inc.", "Dial …
## $ PublicationYear <string> "c2011.", "2010.", "2015", "2005.", "c2004.", "c2005.…

Counts

counts = seattle_csv |> 
  count(CheckoutYear, wt = Checkouts) |> 
  arrange(CheckoutYear) |> 
  collect()

str(counts)
## tibble [18 × 2] (S3: tbl_df/tbl/data.frame)
##  $ CheckoutYear: int [1:18] 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 ...
##  $ n           : int [1:18] 3798685 6599318 7126627 8438486 9135167 8608966 8321732 8163046 9057096 9136081 ...
counts
## # A tibble: 18 × 2
##    CheckoutYear       n
##           <int>   <int>
##  1         2005 3798685
##  2         2006 6599318
##  3         2007 7126627
##  4         2008 8438486
##  5         2009 9135167
##  6         2010 8608966
##  7         2011 8321732
##  8         2012 8163046
##  9         2013 9057096
## 10         2014 9136081
## 11         2015 9084179
## 12         2016 9021051
## 13         2017 9231648
## 14         2018 9149176
## 15         2019 9199083
## 16         2020 6053717
## 17         2021 7361031
## 18         2022 7001989

Create Parquet Files

pq_path <- "data/seattle-library-checkouts"

seattle_csv |>
  group_by(CheckoutYear) |>
  write_dataset(path = pq_path, format = "parquet")

Originally this code failed. I fixed the problem after reading a dialogue on the github issues page for this chapter. See https://github.com/hadley/r4ds/issues/1374

pq_files = tibble(
  files = list.files(pq_path, recursive = TRUE),
  size_MB = file.size(file.path(pq_path, files)) / 1024^2
)

pq_files %>% 
  summarize(sum(size_MB))
## # A tibble: 1 × 1
##   `sum(size_MB)`
##            <dbl>
## 1          4218.

Open with Arrow

seattle_pq <- open_dataset(pq_path)
str(seattle_pq)
## Classes 'FileSystemDataset', 'Dataset', 'ArrowObject', 'R6' <FileSystemDataset>
##   Inherits from: <Dataset>
##   Public:
##     .:xp:.: externalptr
##     .class_title: function () 
##     .unsafe_delete: function () 
##     class_title: function () 
##     clone: function (deep = FALSE) 
##     files: active binding
##     filesystem: active binding
##     format: active binding
##     initialize: function (xp) 
##     metadata: active binding
##     NewScan: function () 
##     num_cols: active binding
##     num_rows: active binding
##     pointer: function () 
##     print: function (...) 
##     schema: active binding
##     set_pointer: function (xp) 
##     ToString: function () 
##     type: active binding
##     WithSchema: function (schema)

Create a Query

query <- seattle_pq |> 
  filter(CheckoutYear >= 2018, MaterialType == "BOOK") |>
  group_by(CheckoutYear, CheckoutMonth) |>
  summarize(TotalCheckouts = sum(Checkouts)) |>
  arrange(CheckoutYear, CheckoutMonth)

query
## FileSystemDataset (query)
## CheckoutYear: int32
## CheckoutMonth: int64
## TotalCheckouts: int64
## 
## * Grouped by CheckoutYear
## * Sorted by CheckoutYear [asc], CheckoutMonth [asc]
## See $.data for the source Arrow object

Note that unlike working with dataframes, query is just a query.

Use the query

result = query |> collect()
str(result)
## gropd_df [58 × 3] (S3: grouped_df/tbl_df/tbl/data.frame)
##  $ CheckoutYear  : int [1:58] 2018 2018 2018 2018 2018 2018 2018 2018 2018 2018 ...
##  $ CheckoutMonth : int [1:58] 1 2 3 4 5 6 7 8 9 10 ...
##  $ TotalCheckouts: int [1:58] 355101 309813 344487 330988 318049 341825 351207 352977 319587 338497 ...
##  - attr(*, "groups")= tibble [5 × 2] (S3: tbl_df/tbl/data.frame)
##   ..$ CheckoutYear: int [1:5] 2018 2019 2020 2021 2022
##   ..$ .rows       : list<int> [1:5] 
##   .. ..$ : int [1:12] 1 2 3 4 5 6 7 8 9 10 ...
##   .. ..$ : int [1:12] 13 14 15 16 17 18 19 20 21 22 ...
##   .. ..$ : int [1:12] 25 26 27 28 29 30 31 32 33 34 ...
##   .. ..$ : int [1:12] 37 38 39 40 41 42 43 44 45 46 ...
##   .. ..$ : int [1:10] 49 50 51 52 53 54 55 56 57 58
##   .. ..@ ptype: int(0) 
##   ..- attr(*, ".drop")= logi TRUE
result
## # A tibble: 58 × 3
## # Groups:   CheckoutYear [5]
##    CheckoutYear CheckoutMonth TotalCheckouts
##           <int>         <int>          <int>
##  1         2018             1         355101
##  2         2018             2         309813
##  3         2018             3         344487
##  4         2018             4         330988
##  5         2018             5         318049
##  6         2018             6         341825
##  7         2018             7         351207
##  8         2018             8         352977
##  9         2018             9         319587
## 10         2018            10         338497
## # … with 48 more rows

Performance

Let’s start by running Hadley’s test.

First the csv file.

seattle_csv |> 
  filter(CheckoutYear == 2021, MaterialType == "BOOK") |>
  group_by(CheckoutMonth) |>
  summarize(TotalCheckouts = sum(Checkouts)) |>
  arrange(desc(CheckoutMonth)) |>
  collect() |> 
  system.time()
##    user  system elapsed 
##  12.165   0.978  11.546

Now the Parquet.

seattle_pq |> 
  filter(CheckoutYear == 2021, MaterialType == "BOOK") |>
  group_by(CheckoutMonth) |>
  summarize(TotalCheckouts = sum(Checkouts)) |>
  arrange(desc(CheckoutMonth)) |>
  collect() |> 
  system.time()
##    user  system elapsed 
##   0.270   0.052   0.059

Another Comparison

I suspected that this comparison gave parquet an unfair advantage because of the partitioning by year.

I removed year from the filter and reran the comparison.

First the csv file.

seattle_csv |> 
  filter( MaterialType == "BOOK") |>
  group_by(CheckoutMonth) |>
  summarize(TotalCheckouts = sum(Checkouts)) |>
  arrange(desc(CheckoutMonth)) |>
  collect() |> 
  system.time()
##    user  system elapsed 
##  12.120   0.917  10.928

Now the Parquet.

seattle_pq |> 
  filter(MaterialType == "BOOK") |>
  group_by(CheckoutMonth) |>
  summarize(TotalCheckouts = sum(Checkouts)) |>
  arrange(desc(CheckoutMonth)) |>
  collect() |> 
  system.time()
##    user  system elapsed 
##   4.927   0.617   0.657

The parquet is still faster, but only by about a factor of 2.

The bottom line is that parquet cut the storage requirement and the execution time of a query roughly in half.

Graphics

Let’s try to do a plot directly.

seattle_csv %>% 
  ggplot(aes(x = Checkouts)) + geom_density() %>% 
  system.time()
## Error in `fortify()`:
## ! `data` must be a <data.frame>, or an object coercible by `fortify()`,
##   not an S3 object with class <FileSystemDataset>/<Dataset>/<ArrowObject>/<R6>.

Dataframe then Plot

tic()

seattle_pq %>% 
  select(Checkouts) %>% 
  filter(!is.na(Checkouts)) %>% 
  filter(Checkouts >= 1) %>% 
  collect() %>% 
  ggplot(aes(x = Checkouts)) +
  scale_x_log10() +
  geom_density() 

toc()
## 13.361 sec elapsed