What is Apache Arrow?

Apache Arrow is a C++ library that enables you to load and work with data in a variety of formats. It focuses on column oriented storage of data, which enables very fast loading and searching of data.

What is it for?

Among the many applications of the {arrow} package, two of the most accessible are:

  • High-performance reading and writing of data files with multiple file formats and compression codecs, including built-in support for cloud storage
  • Analyzing and manipulating bigger-than-memory data with dplyr verbs

Some of the {arrow} use cases

  • Analyze, process, and write multi-file, larger-than-memory datasets (open_dataset(), write_dataset())
  • Read large CSV and JSON files with excellent speed and efficiency (read_csv_arrow(), read_json_arrow())
  • Read and write Parquet files (read_parquet(), write_parquet()), an efficient and widely used columnar format
  • Read and write Feather files (read_feather(), write_feather()), a format optimized for speed and interoperability

What is Columnar Storage?

https://en.wikipedia.org/wiki/Column-oriented_DBMS

Most of us are familiar with row oriented storage. In row oriented storage, the unit of storage is a row. All the data in a row is stored together.

In a column oriented store, the unit of storage is a column. All the data in a column is stored together. It turns out for a lot of data queries, column oriented storage is faster to search and traverse.

In short, {arrow} lets you convert row oriented data into column oriented data and take advantage of these speed gains.

The big caveat is that searching a column-oriented store is faster, saving and storing the data is less efficient overall. But for data that isn’t changing all the time, it can be worth it.

Arrow Datasets: access and work with many files at once.

Say you have a folder of .csv files, with identical headers. Is there a way to work with these files?

{arrow} has a function called open_dataset() that will let us work with these files as if they were a single entity.

Let’s open all the files in data/training (about 20000 csv files):

ds <- open_dataset("data/training", format="csv", delim = "|")
ds
## FileSystemDataset with 20336 csv files
## HR: double
## O2Sat: double
## Temp: double
## SBP: double
## MAP: double
## DBP: double
## Resp: double
## EtCO2: double
## BaseExcess: double
## HCO3: double
## FiO2: double
## pH: double
## PaCO2: double
## SaO2: double
## AST: double
## BUN: double
## Alkalinephos: double
## Calcium: double
## Chloride: double
## Creatinine: double
## Bilirubin_direct: double
## Glucose: double
## Lactate: double
## Magnesium: double
## Phosphate: double
## Potassium: double
## Bilirubin_total: double
## TroponinI: double
## Hct: double
## Hgb: double
## PTT: double
## WBC: double
## Fibrinogen: double
## Platelets: double
## Age: double
## Gender: int64
## Unit1: double
## Unit2: double
## HospAdmTime: double
## ICULOS: int64
## SepsisLabel: int64

Getting our Dataset into Memory

We can load the entire dataset into memory using collect(). Depending on the total dataset size, this can be completely reasonable.

The following code works on my Mac with 16 Gb of RAM, but it chugs on RStudio Cloud.

#don't run this code block on a low memory computer
ds %>%
  collect() %>%
  head()
## # A tibble: 6 × 41
##      HR O2Sat  Temp   SBP   MAP   DBP  Resp EtCO2 BaseExcess  HCO3   FiO2     pH
##   <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>      <dbl> <dbl>  <dbl>  <dbl>
## 1   NaN NaN     NaN   NaN NaN     NaN NaN     NaN        NaN   NaN NaN    NaN   
## 2    97  95     NaN    98  75.3   NaN  19     NaN        NaN   NaN NaN    NaN   
## 3    89  99     NaN   122  86     NaN  22     NaN        NaN   NaN NaN    NaN   
## 4    90  95     NaN   NaN NaN     NaN  30     NaN         24   NaN NaN      7.36
## 5   103  88.5   NaN   122  91.3   NaN  24.5   NaN        NaN   NaN   0.28 NaN   
## 6   110  91     NaN   NaN NaN     NaN  22     NaN        NaN   NaN NaN    NaN   
## # … with 29 more variables: PaCO2 <dbl>, SaO2 <dbl>, AST <dbl>, BUN <dbl>,
## #   Alkalinephos <dbl>, Calcium <dbl>, Chloride <dbl>, Creatinine <dbl>,
## #   Bilirubin_direct <dbl>, Glucose <dbl>, Lactate <dbl>, Magnesium <dbl>,
## #   Phosphate <dbl>, Potassium <dbl>, Bilirubin_total <dbl>, TroponinI <dbl>,
## #   Hct <dbl>, Hgb <dbl>, PTT <dbl>, WBC <dbl>, Fibrinogen <dbl>,
## #   Platelets <dbl>, Age <dbl>, Gender <int>, Unit1 <dbl>, Unit2 <dbl>,
## #   HospAdmTime <dbl>, ICULOS <int>, SepsisLabel <int>

collect() is a very important verb in {arrow}: all operations are not calculated until collect() (or compute()) is called.

This is because {arrow} uses lazy-evaluation. It doesn’t execute the dplyr pipeline until it has to produce redsults.

Lazy-evaluation is a bit misleading because you think that the operations are instantaneous. Instead, arrow tries to formulate an efficient plan to query the data.

What a lazy package!

Underneath it all, our Dataset is an R6 object. The cool thing is that we can work with it using dplyr like any other data source. Underneath it all, we are querying all of the separate csv files, but it looks like we are querying a single data source.

small_ds <- ds %>%
  filter(Age > 50) %>%
  filter(SepsisLabel == 1) %>%
  select(Age, Gender, SepsisLabel) %>%
  distinct()

small_ds
## FileSystemDataset (query)
## Age: double
## Gender: int64
## SepsisLabel: int64
## 
## See $.data for the source Arrow object

Remember, it’s not until we run collect() that Arrow does anything:

small_ds %>%
  collect()
## # A tibble: 1,274 × 3
##      Age Gender SepsisLabel
##    <dbl>  <int>       <int>
##  1  77.3      0           1
##  2  70.2      0           1
##  3  59.5      1           1
##  4  79.3      0           1
##  5  73.3      1           1
##  6  65.6      0           1
##  7  62.7      0           1
##  8  56.0      1           1
##  9  51.0      1           1
## 10  85.0      1           1
## # … with 1,264 more rows

We can run mutate() on the entire dataset:

ds %>%
  mutate(age_days = Age * 365) %>%
  select(Age, Gender, age_days) %>%
  distinct() %>%
  collect() %>%
  head()
## # A tibble: 6 × 3
##     Age Gender age_days
##   <dbl>  <int>    <dbl>
## 1  75.9      0   27707.
## 2  83.1      0   30346.
## 3  45.8      0   16724.
## 4  64.2      1   23448.
## 5  70.8      0   25846.
## 6  49.8      0   18177

Everything is processed in place.

Summarizing in {arrow}

group_by()/mutate() and group_by()/summarize() operations are now implemented in {arrow}.

#Note: this code block crashes a 4 Gb version of RStudio Cloud
small_ds %>%
  group_by(Gender) %>%
  summarize(count=n()) %>%
  collect()
## # A tibble: 2 × 2
##   Gender count
##    <int> <int>
## 1      0   505
## 2      1   769

What about {vroom}?

The {vroom} package is another way to load up lots of files. It focuses on getting the data from files into memory as fast as possible.

Depending on your use case, it’s worth looking at {vroom}. But it does not do the on-disk processing that {arrow} does.

Feather versus Parquet format

You may have heard of the Parquet format before. This is a columnar format that is commonly used throughout industry to store data. Underneath it, a Parquet file is a folder that contains subfiles. Parquet files are made for long-term storage, as part of their goal is compression of files.

https://databricks.com/glossary/what-is-parquet

Apache Arrow also adds the Feather format. Feather files are optimized for fast in-memory access, and tend to be larger than Parquet files. Depending on the data, Feather files can be up to twice as large as Parquet files.

write_dataset(ds, "data/training.feather", format = "feather")
ds_f = open_dataset("data/training.feather", format = "feather")
ds_f %>%
  filter(Age > 70) %>%
  filter(Gender == 0) %>%
  select(HR, O2Sat, Gender, Age) %>%
  collect()
## # A tibble: 141,890 × 4
##       HR O2Sat Gender   Age
##    <dbl> <dbl>  <int> <dbl>
##  1   NaN   NaN      0  75.9
##  2    61    99      0  75.9
##  3    64    98      0  75.9
##  4    56   100      0  75.9
##  5    66    99      0  75.9
##  6    94   100      0  75.9
##  7    58    99      0  75.9
##  8    57   100      0  75.9
##  9    62   100      0  75.9
## 10    58    95      0  75.9
## # … with 141,880 more rows

Partitioned Datasets

One way to enable even faster querying is to convert your data into a partitioned dataset. We can separate the files out into separate directories based on a grouping operation.