Distributed R on the CRC without the agonizing pain

Patrick Miller
February 9, 2017

Distributed R on the CRC without the agonizing pain

agonizing

Distributed R on the CRC with less agonizing pain

devtools::install_github("patr1ckm/distributr")

Slides: http://rpubs.com/patr1ckm/distributr

Topics Today

  • Grid search with distributr
    • gapply
  • Parallelization
    • gapply(..., .mc.cores=8)
  • CRC / Sun Grid Engine
    • setup, submit, collect, tidy

Topics Today

  • Grid search with distributr
    • gapply
  • Parallelization
    • gapply(..., .mc.cores=8)
  • CRC / Sun Grid Engine
    • setup, submit, collect, tidy

Monte Carlo Simulation

Some examples:

  • Power or Type I error as a function of \( n \), effect size, etc.
  • Quantifying performance of an estimator/algorithm when assumptions aren't met
  • Comparing estimators/algorithms
  • Verifying analytical results
  • Developing empirical corrections

Monte Carlo Simulation

Often want to make quantifications over a grid of parameters or arguments (grid search)

Grid Search

Example: Power for testing \( H_0: \beta_j = 0 \) in linear regression

n <- c(50, 100)
sig <- c(1, 2)
b <- c(.1, .5)
(grid <- expand.grid(n=n, sig=sig, b=b))
    n sig   b
1  50   1 0.1
2 100   1 0.1
3  50   2 0.1
4 100   2 0.1
5  50   1 0.5
6 100   1 0.5
7  50   2 0.5
8 100   2 0.5

Grid Search

Example: Loop over the conditions

res_loop <- list()
for(i in 1:nrow(grid)){
  pvals <- matrix(NA, 5, 2)
  cond <- grid[i, ]
  for(r in 1:5){
    n <- cond$n; sig <- cond$sig
    b <- cond$b
    x <- matrix(rnorm(n), n, 1)
    y <- x %*% b + rnorm(n, 0, sig)
    pvals[r, ] <- coef(summary(lm(y~x)))[,"Pr(>|t|)"]
  }
  res_loop[[i]] <- pvals
}
res_loop[[1]]
          [,1]      [,2]
[1,] 0.9245824 0.4626288
[2,] 0.7285347 0.8737242
[3,] 0.3863873 0.1110581
[4,] 0.7721305 0.6497601
[5,] 0.4784289 0.2723413

Grid Search

But can we make it simpler and more general?

Looping with lapply

(Wickham, 2014)

f <- function(x){
  x + 5
}
lapply(1:3, f)
[[1]]
[1] 6

[[2]]
[1] 7

[[3]]
[1] 8

Grid Search with gapply

Write the task as a function

f <- function(n, sig, b){ 
  x <- matrix(rnorm(n), n, 1)
  b <- matrix(b, 1, 1)
  y <- x %*% b + rnorm(n, 0, sig)
  coef(summary(lm(y~x)))[,"Pr(>|t|)"]
}

Apply f to a grid of arguments

res <- gapply(f, n=c(50, 100), 
              sig=c(1, 2), b=c(.5, 1))

Grid Search

  • gapply applies a function to a grid of arguments (repeatedly)
  • merges results with the argument grid
library(distributr)
f <- function(n, sig, b){ 
  x <- matrix(rnorm(n), n, 1)
  b <- matrix(b, 1, 1)
  y <- x %*% b + rnorm(n, 0, sig)
  coef(summary(lm(y~x)))[,"Pr(>|t|)"]
}
res_gapply <- gapply(f,
                     n=c(50, 100), sig=c(1, 2), b=c(.5, 1), 
                     .reps=5, .verbose=0)
res_gapply[1:5, ]
   n sig   b .rep (Intercept)            x
1 50   1 0.5    1  0.76893007 0.0003432391
2 50   1 0.5    2  0.48672825 0.0059409119
3 50   1 0.5    3  0.45300281 0.0040870328
4 50   1 0.5    4  0.05653914 0.0019590826
5 50   1 0.5    5  0.21125299 0.0060946085

Grid Search

Compare the code between loop and gapply

res_loop <- list()
grid <- expand.grid(n=c(50, 100), sig=c(1, 2), b=c(.1, .5))
for(i in 1:nrow(grid)){
  pvals <- matrix(NA, 5, 2)
  cond <- grid[i, ]
  for(r in 1:5){
    pvals[r, ] <- f(n=cond$n, sig=cond$sig, b=cond$b)
  }
  res_loop[[i]] <- pvals
}
res_gapply <- gapply(f, 
                     n=c(50, 100), sig=c(1, 2), b=c(.5, 1), 
                     .reps=5, .verbose=0)
  • No temporary storage (pvals, cond)
  • No nested loop

Grid Search

Compare the results between loop and gapply

res_gapply[1:5, ]
   n sig   b .rep (Intercept)            x
1 50   1 0.5    1   0.4218106 7.771227e-03
2 50   1 0.5    2   0.9383055 2.808970e-02
3 50   1 0.5    3   0.8766481 4.602921e-03
4 50   1 0.5    4   0.3592465 1.598595e-04
5 50   1 0.5    5   0.6087088 2.762259e-08
res_loop[[1]]
          [,1]       [,2]
[1,] 0.2351734 0.03380105
[2,] 0.3784600 0.49035658
[3,] 0.1423843 0.36743578
[4,] 0.6954699 0.65827891
[5,] 0.8069454 0.42568038
  • Now you can work with results and argument grid

Topics Today

  • Grid search with distributr
    • gapply
  • Parallelization
    • gapply(..., .mc.cores=8)
  • Sun Grid Engine
    • setup, submit, collect, tidy

Parallelization

Can we make the grid search faster?

res <- gapply(f, 
  n=c(50, 100), sig=c(1, 2), b=c(.05, .25), 
  .reps=500, .verbose=0)
elapsed 
  5.049 

Parallelization: How it works

cpu

  • GHZ: Speed at which single instructions are executed
  • Core: how many instructions can be executed at once

Parallelization: How it works

cpu

  • thread: simulatenous instructions that can be executed on the same core
  • each physical core has 2 threads
  • 4 x 2 = 8 total cores

Parallelization in R (Mac/Unix)

Principle: if you can write it in lapply, it can be parallelized

system.time(
  lapply(1:3, function(x){Sys.sleep(.3)}))
elapsed 
  0.905 
library(parallel)
system.time(
  mclapply(1:3, function(x){Sys.sleep(.3)}, 
           mc.cores=3))
elapsed 
  0.316 

Parallelization in R (Windows)

On windows spin up separate processes:

library(parallel)
cl <- makePSOCKcluster(3)
system.time(
  parLapply(cl, X=1:3, 
            fun=function(x){Sys.sleep(.3)}))
stopCluster(cl = cl)
elapsed 
  0.306 

Parallelization with grid search

res_p <- gapply(f, 
  n=c(50, 100), sig=c(1, 2), b=c(.05, .25), 
  .reps=500, .verbose=0, .mc.cores=8)
not_parallel.elapsed     parallel.elapsed 
               5.049                1.178 

Topics Today

  • Grid search with distributr
    • gapply
  • Parallelization
    • gapply(..., .mc.cores=8)
  • CRC / Sun Grid Engine
    • setup, submit, collect, tidy

CRC / Sun Grid Engine: Why?

If you have ever

  • let something run on your laptop overnight
  • been tempted to run analyses on computer lab machines
  • ran analyses for more than a few hours
  • limited the scope of experiments so that they would run quickly on your laptop

Consider using campus computing cluster run by the Center for Research Computing (CRC).

https://crc.nd.edu/

CRC / Sun Grid Engine: How it works

clusterd

$ ssh pmille13@crcfe01.crc.nd.edu
pmille13@crcfe02.crc.nd.edu's password: 

Welcome to the Notre Dame Center for Research Computing (CRC).
The CRC Home Page can be found at: http://crc.nd.edu

CRC / Sun Grid Engine: How it works

cluster

Queue: Long (General)

  • 176 Nodes
  • 24 Cores / Node
  • 256 GB RAM / Node

Queue: DACCS (Social Sciences)

  • 54 Nodes
  • 64 Cores / Node
  • 128 GB RAM / Node

CRC / Sun Grid Engine: How it works

  • Since computing resources are shared, jobs are submitted to a scheduler
  • The scheduler assigns jobs to nodes when the nodes are available
$ qsub submit
#!/bin/csh

#$ -M netid@nd.edu   # Email address for job notification
#$ -m abe            # Send mail when job begins, ends and aborts
#$ -pe smp 24        # Specify parallel environment and legal core size
#$ -q long           # Specify queue
#$ -N job_name       # Specify job name

module load R        # Required modules

Rscript run_my_thing.R 

CRC / Sun Grid Engine

lapply_clustr

(Wickham, 2014)

  • f is applied to a set of arguments a
  • Send unique conditions to separate nodes
  • Carry out replications in parallel on each node

CRC / Sun Grid Engine

lapply_clustr

  • Elements in x indexed by $SGE_TASK_ID
#!/bin/csh

#$ ...
#$ -t 1-50           # Task ids

module load R        

Rscript run_f.R

CRC / Sun Grid Engine: Working Demo

$ mkdir test
$ ls test
ex1.R  submit
$ cd test
$ module load R
$ cat submit
$ cat ex1.R
$ qsub submit
Your job-array 730899.1-4:1 ("ex1") has been submitted

CRC / Sun Grid Engine: Working Demo

Sample output

$ ls 
ex1.o730899.1  ex1.o730899.2  ex1.o730899.3  ex1.o730899.4  ex1.R  res1.Rdata  res2.Rdata  res3.Rdata  res4.Rdata  submit
$ R
load("res1.Rdata")
res
[1] "1" # read the variable SGE_TASK_ID

Challenges ahead

pass

distributr

Principle: if you can write it in gapply, it can be run on the CRC

distributr

CRC / Sun Grid Engine: Grid Search with distributr

The quickest way to get running:

devtools::install_github("patr1ckm/distributr")
library(distributr)
plan <- sge_test()  
tidy(collect())
   x y .sge_id .rep value
1  3 1       1    1     3
2  4 1       2    1     4
3  5 1       3    1    NA
4  6 1       4    1    NA
...

CRC / Sun Grid Engine: Grid Search with distributr

library(distributr)
f <- function(n, sig, b){ 
  x <- matrix(rnorm(n), n, 1)
  b <- matrix(b, 1, 1)
  y <- x %*% b + rnorm(n, 0, sig)
  coef(summary(lm(y~x)))[,"Pr(>|t|)"]
}
sim <- gapply(f, 
               n=c(50, 100), sig=c(1, 2), b=c(.5, 1), 
               .verbose=0, .eval=FALSE)
plan <- setup(sim, .dir="tmp", .reps=5, .mc.cores=5)    
mkdir -p tmp/results
mkdir -p tmp/SGE_Output
touch tmp/submit
writing arg_grid.Rdata

CRC / Sun Grid Engine: Grid Search with distributr

submit("tmp")
res <- tidy(collect(dir = "tmp"))
head(res)
    n sig   b .sge_id .rep (Intercept)            x
1  50   1 0.5       1    1  0.57734268 1.321219e-05
2  50   1 0.5       1    2  0.87405905 1.105145e-03
3  50   1 0.5       1    3  0.15543963 1.171892e-02
4  50   1 0.5       1    4  0.00774973 4.230073e-05
5  50   1 0.5       1    5  0.83558643 5.429647e-04
6 100   1 0.5       2    1  0.61792310 1.452060e-08

CRC / Sun Grid Engine: distributr other features

Other features

setup(plan, .seed=1)             # each node has a non-overlapping seed
setup(plan, .mc.cores=c(12, 16)) # can specify a range of cores
add_jobs(plan, ...)              # adds jobs to plan

library(dplyr)                   
filter_jobs(plan, a < 1)         # submit jobs matching criteria
collect(filter="a < 1")    # results for jobs matching criteria 
tidy(..., stack=TRUE)            # tidy when results have varying length

More examples and features described on the wiki: https://github.com/patr1ckm/distributr/wiki

  • Job and replication chunking
  • Caching intermediate results

distributr: Benefits and Limitations

Reasons why I built it:

  • Run more conditions / replications, do more pilots, analyze your results faster
  • Incorporates what I have learned about large scale simulations on SGE

Limitations

  • Might not solve your specific problem
  • If something goes wrong, it can be frustrating to debug

Contributions welcome! Pull requests, issues, feedback, bug reports are all extremely helpful.

Topics Today

Congratulations!

  • Grid search with distributr
    • gapply
  • Parallelization
    • mclapply, gapply
  • CRC / Sun Grid Engine
    • setup, submit, collect, tidy

Acknowledgements

  • Gitta Lubke
  • Dan McArtor
  • Alex Brodersen
  • Scott Hampton & the Center for Research Computing

References

Wickham, H. (2014). Advanced R. CRC Press.

Further Exploration

Bengtsson, H. (2016). future: A Future API for R. R package version 1.1.1.

Bischl, B., Lang, M., Mersmann, O., Rahnenführer, J., & Weihs, C. (2015). BatchJobs and BatchExperiments: Abstraction mechanisms for using R in batch environments. Journal of Statistical Software, 64(1), 1-25.

Wickham, H. (2014). Tidy data. Journal of Statistical Software, 59(10), 1-23. Chicago

Wickham, H., & Francois, R. (2015). dplyr: A grammar of data manipulation. R package version 0.4, 1, 20.

Appendix: How gapply works

Expand arguments into a grid and convert to list

f <- function(n, sig, b){ 
  x <- matrix(rnorm(n), n, 1)
  b <- matrix(b, 1, 1)
  y <- x %*% b + rnorm(n, 0, sig)
  coef(summary(lm(y~x)))[,"Pr(>|t|)"]
}
grid <- expand.grid(n=c(50, 100), sig=c(1, 2), b=c(.1, .5))
grid_ls <- split(grid, 1:nrow(grid))
grid_ls[1:2]
$`1`
   n sig   b
1 50   1 0.1

$`2`
    n sig   b
2 100   1 0.1

Appendix: How gapply works

Apply a function to the list of arguments

f <- function(n, sig, b){ 
  x <- matrix(rnorm(n), n, 1)
  b <- matrix(b, 1, 1)
  y <- x %*% b + rnorm(n, 0, sig)
  coef(summary(lm(y~x)))[,"Pr(>|t|)"]
}
grid <- expand.grid(n=c(50, 100), sig=c(1, 2), b=c(.1, .5))
grid_ls <- split(grid, 1:nrow(grid))
res <- lapply(grid_ls, function(args){ do.call(f, args)})
res[1:2]
$`1`
(Intercept)           x 
  0.6635901   0.9546764 

$`2`
(Intercept)           x 
 0.87154454  0.03294307 

Appendix: How gapply works

Evaluate the function repeatedly for each unique combination of arguments

f <- function(n, sig, b){ 
  x <- matrix(rnorm(n), n, 1)
  b <- matrix(b, 1, 1)
  y <- x %*% b + rnorm(n, 0, sig)
  coef(summary(lm(y~x)))[,"Pr(>|t|)"]
}
grid <- expand.grid(n=c(50, 100), sig=c(1, 2), b=c(.1, .5))
grid_ls <- split(grid, 1:nrow(grid))
res <- lapply(grid_ls, function(args){ 
  replicate(5, do.call(f, args))
})
res[1:2]
$`1`
                  [,1]      [,2]      [,3]      [,4]      [,5]
(Intercept) 0.36128185 0.7964964 0.5296521 0.1014231 0.7417084
x           0.03595442 0.7251446 0.7041782 0.1206791 0.9837255

$`2`
                 [,1]      [,2]      [,3]      [,4]      [,5]
(Intercept) 0.5533910 0.8932028 0.5967738 0.9264787 0.9412291
x           0.6999897 0.8592359 0.9839882 0.3514110 0.5483031

Appendix: Complete example

library(distributr)
library(dplyr)
library(ggplot2)

f <- function(n, sig, b){ 
  x <- matrix(rnorm(n), n, 1)
  b <- matrix(b, 1, 1)
  y <- x %*% b + rnorm(n, 0, sig)
  coef(summary(lm(y~x)))[,4]
}
res <- gapply(f, 
  n=c(50, 100), sig=c(1, 2), 
  b=c(.05, .25), 
  .reps=500, .verbose=0)
d <- res %>%
  mutate(x = x < .05) %>% 
  mutate(b = factor(b)) %>%
  group_by(n, sig, b) %>% 
  summarize(power = mean(x))

p <- ggplot(d, 
  aes(y=power, x=n, color=b)) + 
  geom_line() +
  facet_grid(~sig, 
    labeller = label_both)

plot of chunk unnamed-chunk-45

Appendix: DIY with Raspberry Pi?

pi

  • 32 Raspberry Pis with 8GB SSD: $45 x 32 = $1440
  • Chromebook for head node = $250
  • Enclosure = $150
  • Total: < $2000

Pros:

  • The lights obviously