Here we are going to see how to work with the R arrow package to make a CSV and Parquet backed database, strictly an arrow Dataset. This allows for larger than memory processing of data that is big enough to bit on your RAID array, but too big to safely process in RAM (you as standard need no less than x2 the size of the object to do anything other than trivial processing in RAM). This class of data is often called “Medium Data” (where “Small” fits easily in RAM, and “Big” cannot even be stored locally), and it is an increasingly common scale for data scientists since the availability of disk memory (via RAIDs etc) is becoming extremely cheap and still growing (basically keeping up with the scale of datasets being created over time), whilst RAM is staying quite pricey and not growing as fast.

Load what we need:

library(arrow)
## 
## Attaching package: 'arrow'
## The following object is masked from 'package:utils':
## 
##     timestamp
library(data.table)
library(foreach)
library(dplyr)
## 
## Attaching package: 'dplyr'
## The following objects are masked from 'package:data.table':
## 
##     between, first, last
## The following objects are masked from 'package:stats':
## 
##     filter, lag
## The following objects are masked from 'package:base':
## 
##     intersect, setdiff, setequal, union
library(microbenchmark)

Arrow Datasets

Arrow is supported by the Apache open source foundation (https://arrow.apache.org), and is an in-memory format for data that allows consistent operations between programs (no need to keep saving and loading the data in memory as you switch between e.g. R and Python). I have spend a decent amount of time looking at low effort methods for working on Medium data, and my feeling is Apache Arrow is the sweet spot. It is threaded by default, and for the most part you can trust the defaults (re fiddly things like chunk sizes for data etc). Arrow is well supported in R (arrow), Python (pyarrow) and Julia (Arrow.jl) and many others, so it also meets the requirement to be cross-language and flexible. I prefer the speed (this is much faster in benchmarks), simplicity and flexibility of this system rather than setting up a MiniSQL database (or similar). Partly this is because it is easier to setup, but I also really dislike SQL syntax for data exploration.

In short, Arrow Datasets are a way to work with large on-disk objects and treat them like they are in memory (for the most part). The backend can be a number of different formats (two popular ones, CSV and Parquet, are discussed below). Once the virtual Dataset has been made the R arrow package allows us to interact with it using popular dplyr verbs (most of them exist). When using this workflow it is common to use the pipe operator %>% to send the result of one operation into the next. The clever part is the lazy evaluation of R means only the minimal outputs that are needed are extracted, i.e. if we only want certain rows and columns then the disk is only touched where absolutely necessary to extract this data- there is no expensive full read of each table into RAM.

CSV Backend

We are going to make a data dump of csv files and turn this into out first arrow Dataset. The reason this is realistic is even the dumbest processes can write out to CSV files, and you can write the results of embarrassingly parallel processes to them (and append, which most Big Data formats do not actually allow since they are columnar based).

CSV_dir = paste0(tempdir(),'/CSV_DB/')
dir.create(CSV_dir)

set.seed(666)
dummy = foreach(i = 1:100)%do%{
  tempDT = data.table(let=letters[sample(26,1e3,TRUE)], num=rnorm(1e3), loc=i)
  fwrite(tempDT, paste0(CSV_dir,'examp_',formatC(i, width=3,flag=0),'.csv')) # files like examp_001.csv
}

We can actually immediately operate on this as an Arrow database:

CSV_DB = open_dataset(CSV_dir, format='csv')

output1 = CSV_DB %>% #output table will be output1, input is the CSV_DB
filter(let == 'a') %>% #filter all rows to where let =='a'
collect %>% #collect actually creates the result
setDT # setDT turns it into a data.table (which I prefer)

output2 = CSV_DB %>% filter(loc == 2) %>% collect %>% setDT #we put it on one line for compactness

output3 = CSV_DB %>% filter(num > 0) %>% collect %>% setDT

We can have a look to check they look like we might expect:

print(output1)
##       let        num loc
##    1:   a  0.3642015   3
##    2:   a -1.7487617   3
##    3:   a  0.2855701   3
##    4:   a -1.0879064   3
##    5:   a -0.5275684   3
##   ---                   
## 3890:   a  0.1532621 100
## 3891:   a  0.1960444 100
## 3892:   a -1.2246335 100
## 3893:   a  0.3052662 100
## 3894:   a -1.3971481 100
print(output2)
##       let           num loc
##    1:   l -2.1058920734   2
##    2:   x -0.0141862716   2
##    3:   b  1.2092090765   2
##    4:   f  1.6265735126   2
##    5:   k  0.7010012254   2
##   ---                      
##  996:   o  0.0005219908   2
##  997:   e  1.1955614497   2
##  998:   u -0.4728802333   2
##  999:   d -0.4936660924   2
## 1000:   l -0.0305460295   2
print(output3)
##        let        num loc
##     1:   b 1.20920908   2
##     2:   f 1.62657351   2
##     3:   k 0.70100123   2
##     4:   d 0.08132259   2
##     5:   n 0.95825847   2
##    ---                   
## 50213:   r 0.47952574 100
## 50214:   p 0.09848961 100
## 50215:   g 0.14690802 100
## 50216:   t 0.27756798 100
## 50217:   x 0.56339679 100

The default selects all columns, we can target a subset easily:

output4 = CSV_DB %>%
filter(num > 0, let=='a') %>% #a more complex filter
select(num, let) %>% # just num/let columns to be selected (in different order)
collect %>% setDT

print(output4)
##               num let
##    1: 1.086742682   a
##    2: 0.001793171   a
##    3: 0.574302647   a
##    4: 0.146423651   a
##    5: 1.217304737   a
##   ---                
## 1928: 0.842571044   a
## 1929: 1.782206219   a
## 1930: 0.153262050   a
## 1931: 0.196044375   a
## 1932: 0.305266190   a

Where we now filter only positive ‘num’ where ‘let’ is ‘a’, and only keep those columns.

We can also create new columns as a function of other columns using mutate:

output5 = CSV_DB %>%
filter(num > 0, let=='a') %>% #a more complex filter
mutate(mut = num*loc) %>% #pretty much any function you like here
select(num, loc, mut) %>% #select some columns we care about
collect %>% setDT

print(output5)
##               num loc          mut
##    1: 1.086742682   1 1.086743e+00
##    2: 0.001793171   1 1.793171e-03
##    3: 0.574302647   1 5.743026e-01
##    4: 0.146423651   1 1.464237e-01
##    5: 1.217304737   1 1.217305e+00
##   ---                             
## 1928: 0.842571044 100 8.425710e+01
## 1929: 1.782206219 100 1.782206e+02
## 1930: 0.153262050 100 1.532621e+01
## 1931: 0.196044375 100 1.960444e+01
## 1932: 0.305266190 100 3.052662e+01

Parquet Backend

Parquet is the on-disk implementation of Apache Arrow, allowing memory mapping and parallel IO and by default highly loss-lessly compressed column data. It should be small on disk and rapid to interact with. It is also considered a stable archival grade format now the library has progressed beyond v1.0. Being a compressed binary format, there might be some caveats to the archival nature of it (versus human readable CSV), but many big companies and longterm projects are committing to it. It also works nicely in TopCat (which is important to astronomers).

For speed (and bigger files) we might want to convert the database to a Parquet backend:

Parq_dir = paste0(tempdir(),'/Parq_DB/')

write_dataset(CSV_DB, Parq_dir, format='parquet', partitioning='loc') #partitioning tells it how to split the database
Parq_DB = open_dataset(Parq_dir, format='parquet')
print(Parq_DB) #should be 100 files
## FileSystemDataset with 100 Parquet files
## let: string
## num: double
## loc: int32
output1b = Parq_DB %>% #output table will be output1, input is the Parq_DB
filter(let == 'a') %>%
collect %>% setDT

output2b = Parq_DB %>% filter(loc == 2) %>% collect %>% setDT

output3b = Parq_DB %>% filter(num > 0) %>% collect %>% setDT

This should look the same!

print(output1b)
##       let         num loc
##    1:   a  0.65379107  11
##    2:   a -0.78061440  11
##    3:   a  0.35201756  11
##    4:   a -0.44398835  11
##    5:   a -2.12741659  11
##   ---                    
## 3890:   a  1.71123052  99
## 3891:   a -0.08407648  99
## 3892:   a  0.26322946  99
## 3893:   a  1.12295305  99
## 3894:   a -1.54637400  99
print(output2b)
##       let           num loc
##    1:   l -2.1058920734   2
##    2:   x -0.0141862716   2
##    3:   b  1.2092090765   2
##    4:   f  1.6265735126   2
##    5:   k  0.7010012254   2
##   ---                      
##  996:   o  0.0005219908   2
##  997:   e  1.1955614497   2
##  998:   u -0.4728802333   2
##  999:   d -0.4936660924   2
## 1000:   l -0.0305460295   2
print(output3b)
##        let        num loc
##     1:   k 1.13498921   1
##     2:   e 1.02087705   1
##     3:   l 0.03484328   1
##     4:   a 1.08674268   1
##     5:   e 1.47200582   1
##    ---                   
## 50213:   x 1.68096132  99
## 50214:   l 0.54056073  99
## 50215:   t 0.66370282  99
## 50216:   y 1.26086717  99
## 50217:   t 1.28964539  99

So, which is faster: csv or parquet?! Let’s see!

microbenchmark(
  {output3 = CSV_DB %>% filter(num > 0) %>% collect %>% setDT},
  {output3b = Parq_DB %>% filter(num > 0) %>% collect %>% setDT}
)
## Unit: milliseconds
##                                                                  expr      min
##    {     output3 = CSV_DB %>% filter(num > 0) %>% collect %>% setDT } 30.79171
##  {     output3b = Parq_DB %>% filter(num > 0) %>% collect %>% setDT } 44.50171
##        lq     mean   median       uq      max neval
##  32.19469 34.31773 32.72936 33.40171 66.04814   100
##  47.43878 49.92886 48.67548 49.84028 68.52914   100

In this case they are pretty similar (CSV is actually faster), but when the individual files are getting up to many GB in size a Parquet backend will be significantly faster.

Where you will see big speed-ups is if you partition based on what you are likely to search on:

Parq_dir2 = paste0(tempdir(),'/Parq_DB2/')

write_dataset(CSV_DB, Parq_dir2, format='parquet', partitioning='let')
Parq_DB2 = open_dataset(Parq_dir2, format='parquet')
print(Parq_DB2) #should be 26 files
## FileSystemDataset with 26 Parquet files
## num: double
## loc: int64
## let: string
output1c = Parq_DB2 %>% filter(let == 'a') %>% collect %>% setDT

output2c = Parq_DB2 %>% filter(loc == 2) %>% collect %>% setDT

output3c = Parq_DB2 %>% filter(num > 0) %>% collect %>% setDT

This should be the same output as before:

print(output1b)
##       let         num loc
##    1:   a  0.65379107  11
##    2:   a -0.78061440  11
##    3:   a  0.35201756  11
##    4:   a -0.44398835  11
##    5:   a -2.12741659  11
##   ---                    
## 3890:   a  1.71123052  99
## 3891:   a -0.08407648  99
## 3892:   a  0.26322946  99
## 3893:   a  1.12295305  99
## 3894:   a -1.54637400  99
print(output2b)
##       let           num loc
##    1:   l -2.1058920734   2
##    2:   x -0.0141862716   2
##    3:   b  1.2092090765   2
##    4:   f  1.6265735126   2
##    5:   k  0.7010012254   2
##   ---                      
##  996:   o  0.0005219908   2
##  997:   e  1.1955614497   2
##  998:   u -0.4728802333   2
##  999:   d -0.4936660924   2
## 1000:   l -0.0305460295   2
print(output3b)
##        let        num loc
##     1:   k 1.13498921   1
##     2:   e 1.02087705   1
##     3:   l 0.03484328   1
##     4:   a 1.08674268   1
##     5:   e 1.47200582   1
##    ---                   
## 50213:   x 1.68096132  99
## 50214:   l 0.54056073  99
## 50215:   t 0.66370282  99
## 50216:   y 1.26086717  99
## 50217:   t 1.28964539  99

But now we can see faster searches on letters since the database is arranged by letter in the first place:

microbenchmark(
  {output1 = CSV_DB %>% filter(let == 'a') %>% collect %>% setDT},
  {output1b = Parq_DB %>% filter(let == 'a') %>% collect %>% setDT},
  {output1c = Parq_DB2 %>% filter(let == 'a') %>% collect %>% setDT}
)
## Unit: milliseconds
##                                                                               expr
##              {     output1 = CSV_DB %>% filter(let == "a") %>% collect %>% setDT }
##   {     output1b = Parq_DB %>% filter(let == "a") %>% collect %>%          setDT }
##  {     output1c = Parq_DB2 %>% filter(let == "a") %>% collect %>%          setDT }
##       min       lq     mean   median       uq      max neval
##  27.99123 30.15786 31.70683 30.72048 31.67343 54.54173   100
##  41.24782 43.67176 46.57929 44.98584 46.53721 64.30084   100
##  25.26759 26.31984 28.50181 27.04371 27.78674 45.56395   100

Parquet can store more complex metadata (than just column names), but for the most part the preference seems to be to keep the backend files flat and simple.

As a general method for working on the databases, my feeling is you should probably generate a dumb CSV backend version as the starting point, and if this is fast enough for you then potentially leave it at that, but use the dplyr methods to operate on the database through arrow (rather than writing your own error prone scripts to loop around the data and assemble the results). If a CSV backend is prohibitively slow (many GBs in size), then at that point it might be worth looking at creating a Parquet backend solution, with some though as to the partitions to use to speed up later searches.

Convert an In-Memory Table to Arrow Dataset

If you already have your target arrow Dataset loaded into an R object then you can directly write this out to an arrow Dataset. This might be the case where you have a machine with a lot of memory, but you want to share the Dataset so people using it with a small memory laptop over an internet connection can still do useful things.

tempDT = data.table(let=letters[sample(26,1e5,TRUE)], num=rnorm(1e5), loc=rep(1:100,each=1e3))
Parq_dir3 = paste0(tempdir(),'/Parq_DB3/')

write_dataset(tempDT, Parq_dir3, format='parquet', partitioning=c('let','loc'))
Parq_DB3 = open_dataset(Parq_dir3, format='parquet')
print(Parq_DB3) #should be 2600 files
## FileSystemDataset with 2600 Parquet files
## num: double
## let: string
## loc: int32
## 
## See $metadata for additional Schema metadata
output1d = Parq_DB3 %>% filter(let == 'a') %>% collect %>% setDT

output2d = Parq_DB3 %>% filter(loc == 2) %>% collect %>% setDT

output3d = Parq_DB3 %>% filter(num > 0) %>% collect %>% setDT

These results will be a bit different because the data was assembled differently (all columns at once, rather than in chunks):

print(output1d)
##               num let loc
##    1:  0.19330141   a  10
##    2: -1.51991807   a  10
##    3:  1.15972309   a  10
##    4:  0.55348833   a  10
##    5:  0.09285332   a  10
##   ---                    
## 3837: -0.08027721   a  99
## 3838: -0.32342454   a  99
## 3839:  0.59508267   a  99
## 3840: -0.77474554   a  99
## 3841: -0.72823871   a  99
print(output2d)
##               num let loc
##    1:  1.33059576   a   2
##    2: -1.19440246   a   2
##    3:  0.03229057   a   2
##    4: -0.57792844   a   2
##    5: -0.31977000   a   2
##   ---                    
##  996: -0.48408501   z   2
##  997: -0.51388739   z   2
##  998: -1.24268100   z   2
##  999: -1.38324315   z   2
## 1000: -0.36558996   z   2
print(output3d)
##               num let loc
##     1: 1.70315031   a   1
##     2: 0.01907913   a   1
##     3: 1.05478725   a   1
##     4: 1.92838518   a   1
##     5: 0.07668439   a   1
##    ---                   
## 49871: 0.15182845   z  99
## 49872: 0.71288153   z  99
## 49873: 0.02316889   z  99
## 49874: 0.19865002   z  99
## 49875: 0.54208505   z  99

Growing Your Database

It is pretty common in astronomy that you might want to append new rows to your date as you go- e.g. you are processing tiles from a larger photometric survey and add data every night. In this case the best route is to stick with a CSV backend system where you either create new files to represent the new data, or perhaps append to an existing file (possible since CSV and similar formats a row major, so adding rows is easy). Because Parquet is column major there is currently (Dec 2022) no route to append to a file without the full cycle of reading it and appending in-memory and writing back out. For that reason writing the outputs from a streaming process should nearly always be done to something like a CSV file.

Once your data is pretty static (no major additions expected or columns to be added), it is at that point you might want to convert the whole lot to a Parquet backend.