library(parallel)
library(foreach)
## Warning: package 'foreach' was built under R version 4.0.3
library(future.apply)
## Warning: package 'future.apply' was built under R version 4.0.3
## Loading required package: future
## Warning: package 'future' was built under R version 4.0.3
library(future)
library(microbenchmark)
## Warning: package 'microbenchmark' was built under R version 4.0.3
# parallel support for big data
# library(sparklyr)
# library(iotools)
# library(pbdR)

types of partition

  1. By task: Apply different tasks to the same of different data
  2. By data: The same tasks is performed on different data

models of parallel computing

  1. In order to run the code in parallel, we need at leat two ideally more processors or cores
  2. Shared memory, distributed memory

Programming paradaigms

  • Master-worker model: There is one process called master, which creates a set of other processes, the workers, and distributes tasks among them. workers perform their tasks and return to the master process, which in turn performs the final processing
  • Map-reduce paradigm
    • Applications for distributed data
    • Hadoop spark
# From previous step
mean_of_rnorm <- function(n) {
  random_numbers <- rnorm(n)
  mean(random_numbers)
}

# Repeat n_numbers_per_replicate, n_replicates times
n <- rep(10000, 50)

# Call mean_of_rnorm() repeatedly using sapply()
result <- sapply(
  # The vectorized argument to pass
  n, 
  # The function to call
  mean_of_rnorm
)

# View the results
hist(result)

ar1_block_of_trajectories <- function(id, rate0 = 0.015, traj_len = 15, block_size = 10) {
    trajectories <- matrix(NA, nrow = block_size, ncol = traj_len)
    for (i in seq_len(block_size)) 
        trajectories[i,] <- ar1_one_trajectory(unlist(ar1est[id, ]), 
                                rate0 = rate0, len = traj_len)
    trajectories
}


# From previous step
ar1_multiple_blocks_of_trajectories <- function(ids, ...) {
  trajectories_by_block <- lapply(ids, ar1_block_of_trajectories, ...)
  do.call(rbind, trajectories_by_block)
}

# Create a sequence from 1 to number of blocks
traj_ids <- seq_len(nrow(ar1est))

# Generate trajectories for all rows of the estimation dataset
trajs <- ar1_multiple_blocks_of_trajectories(
  ids = traj_ids, rate0 = 0.015,
  block_size = 10, traj_len = 15
)

# Show results
show_migration(trajs)

Parallel package

The current R session serves as the master process, while each worker is a separate R process

Multicore functionality (Not available on windows)

  • mcapply
  • mcmapply
  • mcMap
ncores <- detectCores(logical = FALSE)
n <- 100:1

# create a cluster of nodes, or a cluster of workers
cl <- makeCluster(ncores)

microbenchmark(Parallelized = clusterApply(cl, x = n, fun = rnorm, mean = 10, sd = 2),
               Normal = lapply(n, rnorm, mean = 10, sd = 2))
## Unit: microseconds
##          expr     min       lq      mean   median      uq     max neval
##  Parallelized 13002.0 14890.60 16350.226 15995.45 17362.8 23354.3   100
##        Normal   521.5   607.25   675.891   657.00   719.4  1156.2   100
clusterApply(cl, x = ncores:1, fun = rnorm)
## [[1]]
## [1] 0.4724191 1.7120917 0.7144003 0.5974406
## 
## [[2]]
## [1]  0.6019177 -0.4272496  0.2712594
## 
## [[3]]
## [1] 0.5464604 0.8454547
## 
## [[4]]
## [1] -0.7838051
stopCluster(cl)

Sum in parallel

# Evaluate partial sums in parallel
cl <- makeCluster(ncores)

part_sums <- clusterApply(cl, x = c(1, 51),
                    fun = function(x) sum(x:(x + 49)))
# Total sum
total <- sum(unlist(part_sums))

# Check for correctness
total == sum(1:100)
## [1] TRUE
stopCluster(cl)

Mean of random numbers in parallel

# Create a cluster and set parameters
cl <- makeCluster(2)
n_replicates <- 50
n_numbers_per_replicate <- 10000

# Parallel evaluation on n_numbers_per_replicate, n_replicates times
means <- clusterApply(cl, 
             x = rep(n_numbers_per_replicate, n_replicates), 
             fun = mean_of_rnorm)
                
# View results as histogram
hist(unlist(means))

stopCluster(cl)

Supported backends of parallel library

Socket communication (dafault, all OS platforms)

  • All workers start with an empty environment, the master hast to take care of sending everything they need for their computing tasks
cl <- makeCluster(ncores, type = "PSOCK")

Forking (not available for windows)

  • Workers are complete copies of the master process. They know all about all global objects and functions, which can significantly decrease the communications needs
cl <- makeCluster(ncores, type = "FORK")

Using the MPI library (uses Rmpi)

  • Better efficiency for applications
cl <- makeCluster(ncores, type = "MPI")
# Make a cluster with 4 nodes
cl <- makeCluster(4)

# Investigate the structure of cl
str(cl)
## List of 4
##  $ :List of 3
##   ..$ con : 'sockconn' int 9
##   .. ..- attr(*, "conn_id")=<externalptr> 
##   ..$ host: chr "localhost"
##   ..$ rank: num 0
##   ..- attr(*, "class")= chr "SOCKnode"
##  $ :List of 3
##   ..$ con : 'sockconn' int 10
##   .. ..- attr(*, "conn_id")=<externalptr> 
##   ..$ host: chr "localhost"
##   ..$ rank: num 1
##   ..- attr(*, "class")= chr "SOCKnode"
##  $ :List of 3
##   ..$ con : 'sockconn' int 11
##   .. ..- attr(*, "conn_id")=<externalptr> 
##   ..$ host: chr "localhost"
##   ..$ rank: num 2
##   ..- attr(*, "class")= chr "SOCKnode"
##  $ :List of 3
##   ..$ con : 'sockconn' int 12
##   .. ..- attr(*, "conn_id")=<externalptr> 
##   ..$ host: chr "localhost"
##   ..$ rank: num 3
##   ..- attr(*, "class")= chr "SOCKnode"
##  - attr(*, "class")= chr [1:2] "SOCKcluster" "cluster"
# What is the process ID of the workers?
clusterCall(cl, Sys.getpid)
## [[1]]
## [1] 14644
## 
## [[2]]
## [1] 14548
## 
## [[3]]
## [1] 20256
## 
## [[4]]
## [1] 9484
# Stop the cluster
stopCluster(cl)
print_global_var <-
function() {print(a_global_var)}


# A global variable and is defined
a_global_var <- "before"

# Create a socket cluster with 2 nodes
cl_sock <- makeCluster(2, type = "PSOCK")

# Evaluate the print function on each node

tryCatch({
  clusterCall(cl_sock, print_global_var)
},
error = function(e){
  message("ERROR: global variables not found")
  })
## ERROR: global variables not found
# Stop the cluster
stopCluster(cl_sock)
# A global variable and is defined
a_global_var <- "before"

# Create a fork cluster with 2 nodes
cl_fork <- makeCluster(2, type = "FORK")

# Evaluate the print function on each node

clusterCall(cl_fork, print_global_var)

# Stop the cluster
stopCluster(cl_fork)

Core functions of parallel library

  • clusterApply: apply function in parallel
  • clusterApplyLB: ensures the work is distributed fairly among workers, where LB means Load balanced

Wrappers fo clusterApply function * parApply * parLapply * parSapply

Work analogously to their sequentials counterparts apply, lapply, sapply.

  • parRapply
  • parCApply

Are parallel row and column apply functions for matrices.

  • parLapplyLB
  • parSapplyLB

ClusterApply()

clusterApply(cl, x = arg.sequence, fun = myfunc): The length of the argument x determines the number of tasks that is sent to workers. c(1,2) two tasks, c(1,2,3) three tasks and so on..

parallel vs. sequential

Processing overhead

  • Starting/stopping cluster takes time
  • Number of messages sent between nodes and master
  • Size of messages (sending big data is expensive)

Things to consider

  • How big is a single task
  • How much data need to be sent
  • How much gain is there by running it in parallel -> Benchmark

Example

cl <- makeCluster(ncores, type = "PSOCK")
# Wrap this code into a function
mean_of_rnorm_sequentially <-function(n_numbers_per_replicate, n_replicates){ 
n <- rep(n_numbers_per_replicate, n_replicates)
lapply(n, mean_of_rnorm)
}

mean_of_rnorm_in_parallel <- function(n_numbers_per_replicate, n_replicates){ 
n <- rep(n_numbers_per_replicate, n_replicates)
clusterApply(cl, n, mean_of_rnorm) 
}


# Set numbers per replicate to 5 million
n_numbers_per_replicate <- 5000000

# Set number of replicates to 4
n_replicates <- 4

# Run a microbenchmark
microbenchmark(
  # Call mean_of_rnorm_sequentially()
  sequential = mean_of_rnorm_sequentially(n_numbers_per_replicate, n_replicates), 
  # Call mean_of_rnorm_in_parallel()
  parallel = mean_of_rnorm_in_parallel(n_numbers_per_replicate, n_replicates ),
  times = 1, 
  unit = "s"
)
## Unit: seconds
##        expr       min        lq      mean    median        uq       max neval
##  sequential 1.5990750 1.5990750 1.5990750 1.5990750 1.5990750 1.5990750     1
##    parallel 0.5822655 0.5822655 0.5822655 0.5822655 0.5822655 0.5822655     1
stopCluster(cl)
cl <- makeCluster(ncores, type = "PSOCK")

# Set numbers per replicate to 5 million
n_numbers_per_replicate <- 100

# Set number of replicates to 4
n_replicates <- 100
# Run a microbenchmark
microbenchmark(
  # Call mean_of_rnorm_sequentially()
  sequential = mean_of_rnorm_sequentially(n_numbers_per_replicate, n_replicates), 
  # Call mean_of_rnorm_in_parallel()
  parallel = mean_of_rnorm_in_parallel(n_numbers_per_replicate, n_replicates ),
  times = 1, 
  unit = "s"
)
## Unit: seconds
##        expr       min        lq      mean    median        uq       max neval
##  sequential 0.0035436 0.0035436 0.0035436 0.0035436 0.0035436 0.0035436     1
##    parallel 0.0262385 0.0262385 0.0262385 0.0262385 0.0262385 0.0262385     1
stopCluster(cl)

initialization of nodes

  • Each cluster node starts with an empty environment (no libraries loaded)
  • Repeated communication with the master is expensive

Good practice: Master initializes workers at the beginning with everything that stays constant or/and is time consuming

  • Sending static data

  • loading libraries

  • eValuating global function

  • clusterCall Evaluates the same function with the same arguments on all nodes.

cl <- makeCluster(2)
clusterCall(cl, function() library(dplyr))
## [[1]]
## [1] "dplyr"     "stats"     "graphics"  "grDevices" "utils"     "datasets" 
## [7] "methods"   "base"     
## 
## [[2]]
## [1] "dplyr"     "stats"     "graphics"  "grDevices" "utils"     "datasets" 
## [7] "methods"   "base"
stopCluster(cl)

clusterEvalQ evaluates a literal expression on all nodes, which is a parallel equivalent of evalq() function

cl <- makeCluster(2)
clusterEvalQ(cl, {
  library(dplyr)
  df <- tibble()
} )
## [[1]]
## data frame with 0 columns and 0 rows
## 
## [[2]]
## data frame with 0 columns and 0 rows
stopCluster(cl)

clusterExport Exports given objects from master to workers

expr <- "hi!"
cl <- makeCluster(2)
clusterExport(cl, "expr")
stopCluster(cl)
cl <- makeCluster(2)

# From previous step
myrdnorm <- function(n) {
  rdnorm(n, mean = mean, sd = sd)
}

# Set number of numbers to generate
n <- rep(1000, 20)

# Run an expression on each worker
clusterEvalQ(
  cl, {
    # Load extraDistr
    library(extraDistr)
    # Set mean to 10
    mean <- 10
    # Set sd to 5
    sd <- 5
})
## [[1]]
## [1] 5
## 
## [[2]]
## [1] 5
# Run myrdnorm in parallel
res <- clusterApply(cl, n, myrdnorm)

# Plot the results
plot(table(unlist(res)))

stopCluster(cl)
cl <- makeCluster(2)

# Set global objects on master: mean to 20, sd to 10
mean <- 20
sd <- 10

# Load extraDistr on workers
clusterEvalQ(cl, library(extraDistr))
## [[1]]
## [1] "extraDistr" "stats"      "graphics"   "grDevices"  "utils"     
## [6] "datasets"   "methods"    "base"      
## 
## [[2]]
## [1] "extraDistr" "stats"      "graphics"   "grDevices"  "utils"     
## [6] "datasets"   "methods"    "base"
# Export global objects to workers
clusterExport(cl, c("mean", "sd"))

# Run myrdnorm in parallel
res <- clusterApply(cl, n, myrdnorm)

# Plot the results
plot(table(unlist(res)))

stopCluster(cl)

Data chunks

  • Each tasks applied to different data
  • Data chunks are passed to workers as follows:
    1. Random numbers generated on the fly
    2. Passing chunks of data as argument
    3. Chunking on workers’ side

Foreach package

foreach(n = rep(5, 3), m = 10^(0:2), .combine = rbind) %do% rnorm(n, mean = m)
##                [,1]      [,2]        [,3]      [,4]      [,5]
## result.1   1.244865  1.787582  -0.8541951  2.159011  2.049638
## result.2   9.430163  9.357314  11.7989832  9.627906 12.259813
## result.3 100.986793 99.792234 100.0548519 99.511260 98.769947
# Combine by sum
foreach(n = rep(5, 3), m = 10^(0:2), .combine = '+') %do% rnorm(n, mean = m)
## [1] 110.2554 111.1479 111.2121 108.9436 109.4016

List comprehesion

foreach(x = sample(1:1000, 10), .combine = c) %:% 
  when(x %% 3 == 0 || x %% 5 == 0) %do% x
## [1] 774 300 168  81 849

Parallel backends

  • doParallel
    • Interface between foreach and parallel
    • Must register via registerDoParallel
library(doParallel)
## Warning: package 'doParallel' was built under R version 4.0.3
## Loading required package: iterators
## Warning: package 'iterators' was built under R version 4.0.3
cl <- makeCluster(3)
registerDoParallel(cl)
foreach(n = rep(5, 3)) %dopar% rnorm(n)
## [[1]]
## [1] -2.2044012  0.1888616 -0.3969126 -0.7311423 -0.6701238
## 
## [[2]]
## [1]  0.06700566 -1.20334937  1.58754833 -2.26882817 -1.29466430
## 
## [[3]]
## [1]  1.7145588  0.8485924 -1.0096851 -1.4032291  0.9140271
  • doFuture
    • Sequential, cluster, multicore, multiprocess
library(doFuture)
## Warning: package 'doFuture' was built under R version 4.0.3
plan(cluster, workers=3) # Or multicore
foreach(n = rep(5, 3)) %dopar% rnorm(n)
## [[1]]
## [1] -1.0231632 -0.1587720  1.0977022 -0.2206505  0.8896213
## 
## [[2]]
## [1] -0.8562071 -0.8623313  3.1194712 -1.3282737 -0.1906476
## 
## [[3]]
## [1]  0.5777461 -1.0121746  0.2986670 -0.8979932  0.7426261
# Function for doParallel foreach
freq_doPar <- function(cores, min_length = 5) {
    # Register a cluster of size cores
    registerDoParallel(cores = cores)
    
    # foreach loop
    foreach(let = chars, .combine = c, 
            .export = c("max_frequency", "select_words", "words"),
            .packages = c("janeaustenr", "stringr")) %dopar% 
        max_frequency(let, words = words, min_length = min_length)
}

# Run on 2 cores
freq_doPar(2)

future and future.apply

future * Uniform way to evaluate R expressions asynchronously * Provide a unified API for sequential and parallel processing of R expressions * Processing via a construct called future * An abstraction for value that may be available at some point in the future

Load balancing and scheduling

Instead of waiting for the results in a specific order, it accets results from the first worker that finished and immediatly sends a new tasks to that worker

Random numbers in parallel processing

# Create a cluster
cl <- makeCluster(2)

# Check RNGkind on workers
clusterCall(cl, RNGkind)
## [[1]]
## [1] "Mersenne-Twister" "Inversion"        "Rejection"       
## 
## [[2]]
## [1] "Mersenne-Twister" "Inversion"        "Rejection"
# Set the RNG seed on workers
clusterSetRNGStream(cl, 100)

# Check RNGkind on workers
clusterCall(cl, RNGkind)
## [[1]]
## [1] "L'Ecuyer-CMRG" "Inversion"     "Rejection"    
## 
## [[2]]
## [1] "L'Ecuyer-CMRG" "Inversion"     "Rejection"