library(parallel)
library(foreach)
## Warning: package 'foreach' was built under R version 4.0.3
library(future.apply)
## Warning: package 'future.apply' was built under R version 4.0.3
## Loading required package: future
## Warning: package 'future' was built under R version 4.0.3
library(future)
library(microbenchmark)
## Warning: package 'microbenchmark' was built under R version 4.0.3
# parallel support for big data
# library(sparklyr)
# library(iotools)
# library(pbdR)
# From previous step
mean_of_rnorm <- function(n) {
random_numbers <- rnorm(n)
mean(random_numbers)
}
# Repeat n_numbers_per_replicate, n_replicates times
n <- rep(10000, 50)
# Call mean_of_rnorm() repeatedly using sapply()
result <- sapply(
# The vectorized argument to pass
n,
# The function to call
mean_of_rnorm
)
# View the results
hist(result)
ar1_block_of_trajectories <- function(id, rate0 = 0.015, traj_len = 15, block_size = 10) {
trajectories <- matrix(NA, nrow = block_size, ncol = traj_len)
for (i in seq_len(block_size))
trajectories[i,] <- ar1_one_trajectory(unlist(ar1est[id, ]),
rate0 = rate0, len = traj_len)
trajectories
}
# From previous step
ar1_multiple_blocks_of_trajectories <- function(ids, ...) {
trajectories_by_block <- lapply(ids, ar1_block_of_trajectories, ...)
do.call(rbind, trajectories_by_block)
}
# Create a sequence from 1 to number of blocks
traj_ids <- seq_len(nrow(ar1est))
# Generate trajectories for all rows of the estimation dataset
trajs <- ar1_multiple_blocks_of_trajectories(
ids = traj_ids, rate0 = 0.015,
block_size = 10, traj_len = 15
)
# Show results
show_migration(trajs)
The current R session serves as the master process, while each worker is a separate R process
mcapply
mcmapply
mcMap
ncores <- detectCores(logical = FALSE)
n <- 100:1
# create a cluster of nodes, or a cluster of workers
cl <- makeCluster(ncores)
microbenchmark(Parallelized = clusterApply(cl, x = n, fun = rnorm, mean = 10, sd = 2),
Normal = lapply(n, rnorm, mean = 10, sd = 2))
## Unit: microseconds
## expr min lq mean median uq max neval
## Parallelized 13002.0 14890.60 16350.226 15995.45 17362.8 23354.3 100
## Normal 521.5 607.25 675.891 657.00 719.4 1156.2 100
clusterApply(cl, x = ncores:1, fun = rnorm)
## [[1]]
## [1] 0.4724191 1.7120917 0.7144003 0.5974406
##
## [[2]]
## [1] 0.6019177 -0.4272496 0.2712594
##
## [[3]]
## [1] 0.5464604 0.8454547
##
## [[4]]
## [1] -0.7838051
stopCluster(cl)
# Evaluate partial sums in parallel
cl <- makeCluster(ncores)
part_sums <- clusterApply(cl, x = c(1, 51),
fun = function(x) sum(x:(x + 49)))
# Total sum
total <- sum(unlist(part_sums))
# Check for correctness
total == sum(1:100)
## [1] TRUE
stopCluster(cl)
# Create a cluster and set parameters
cl <- makeCluster(2)
n_replicates <- 50
n_numbers_per_replicate <- 10000
# Parallel evaluation on n_numbers_per_replicate, n_replicates times
means <- clusterApply(cl,
x = rep(n_numbers_per_replicate, n_replicates),
fun = mean_of_rnorm)
# View results as histogram
hist(unlist(means))
stopCluster(cl)
Socket communication (dafault, all OS platforms)
cl <- makeCluster(ncores, type = "PSOCK")
Forking (not available for windows)
cl <- makeCluster(ncores, type = "FORK")
Using the MPI library (uses Rmpi)
cl <- makeCluster(ncores, type = "MPI")
# Make a cluster with 4 nodes
cl <- makeCluster(4)
# Investigate the structure of cl
str(cl)
## List of 4
## $ :List of 3
## ..$ con : 'sockconn' int 9
## .. ..- attr(*, "conn_id")=<externalptr>
## ..$ host: chr "localhost"
## ..$ rank: num 0
## ..- attr(*, "class")= chr "SOCKnode"
## $ :List of 3
## ..$ con : 'sockconn' int 10
## .. ..- attr(*, "conn_id")=<externalptr>
## ..$ host: chr "localhost"
## ..$ rank: num 1
## ..- attr(*, "class")= chr "SOCKnode"
## $ :List of 3
## ..$ con : 'sockconn' int 11
## .. ..- attr(*, "conn_id")=<externalptr>
## ..$ host: chr "localhost"
## ..$ rank: num 2
## ..- attr(*, "class")= chr "SOCKnode"
## $ :List of 3
## ..$ con : 'sockconn' int 12
## .. ..- attr(*, "conn_id")=<externalptr>
## ..$ host: chr "localhost"
## ..$ rank: num 3
## ..- attr(*, "class")= chr "SOCKnode"
## - attr(*, "class")= chr [1:2] "SOCKcluster" "cluster"
# What is the process ID of the workers?
clusterCall(cl, Sys.getpid)
## [[1]]
## [1] 14644
##
## [[2]]
## [1] 14548
##
## [[3]]
## [1] 20256
##
## [[4]]
## [1] 9484
# Stop the cluster
stopCluster(cl)
print_global_var <-
function() {print(a_global_var)}
# A global variable and is defined
a_global_var <- "before"
# Create a socket cluster with 2 nodes
cl_sock <- makeCluster(2, type = "PSOCK")
# Evaluate the print function on each node
tryCatch({
clusterCall(cl_sock, print_global_var)
},
error = function(e){
message("ERROR: global variables not found")
})
## ERROR: global variables not found
# Stop the cluster
stopCluster(cl_sock)
# A global variable and is defined
a_global_var <- "before"
# Create a fork cluster with 2 nodes
cl_fork <- makeCluster(2, type = "FORK")
# Evaluate the print function on each node
clusterCall(cl_fork, print_global_var)
# Stop the cluster
stopCluster(cl_fork)
clusterApply
: apply function in parallelclusterApplyLB
: ensures the work is distributed fairly among workers, where LB means Load balancedWrappers fo clusterApply
function * parApply
* parLapply
* parSapply
Work analogously to their sequentials counterparts apply
, lapply
, sapply
.
parRapply
parCApply
Are parallel row and column apply functions for matrices.
parLapplyLB
parSapplyLB
clusterApply(cl, x = arg.sequence, fun = myfunc)
: The length of the argument x determines the number of tasks that is sent to workers. c(1,2)
two tasks, c(1,2,3)
three tasks and so on..
Processing overhead
Things to consider
cl <- makeCluster(ncores, type = "PSOCK")
# Wrap this code into a function
mean_of_rnorm_sequentially <-function(n_numbers_per_replicate, n_replicates){
n <- rep(n_numbers_per_replicate, n_replicates)
lapply(n, mean_of_rnorm)
}
mean_of_rnorm_in_parallel <- function(n_numbers_per_replicate, n_replicates){
n <- rep(n_numbers_per_replicate, n_replicates)
clusterApply(cl, n, mean_of_rnorm)
}
# Set numbers per replicate to 5 million
n_numbers_per_replicate <- 5000000
# Set number of replicates to 4
n_replicates <- 4
# Run a microbenchmark
microbenchmark(
# Call mean_of_rnorm_sequentially()
sequential = mean_of_rnorm_sequentially(n_numbers_per_replicate, n_replicates),
# Call mean_of_rnorm_in_parallel()
parallel = mean_of_rnorm_in_parallel(n_numbers_per_replicate, n_replicates ),
times = 1,
unit = "s"
)
## Unit: seconds
## expr min lq mean median uq max neval
## sequential 1.5990750 1.5990750 1.5990750 1.5990750 1.5990750 1.5990750 1
## parallel 0.5822655 0.5822655 0.5822655 0.5822655 0.5822655 0.5822655 1
stopCluster(cl)
cl <- makeCluster(ncores, type = "PSOCK")
# Set numbers per replicate to 5 million
n_numbers_per_replicate <- 100
# Set number of replicates to 4
n_replicates <- 100
# Run a microbenchmark
microbenchmark(
# Call mean_of_rnorm_sequentially()
sequential = mean_of_rnorm_sequentially(n_numbers_per_replicate, n_replicates),
# Call mean_of_rnorm_in_parallel()
parallel = mean_of_rnorm_in_parallel(n_numbers_per_replicate, n_replicates ),
times = 1,
unit = "s"
)
## Unit: seconds
## expr min lq mean median uq max neval
## sequential 0.0035436 0.0035436 0.0035436 0.0035436 0.0035436 0.0035436 1
## parallel 0.0262385 0.0262385 0.0262385 0.0262385 0.0262385 0.0262385 1
stopCluster(cl)
Good practice: Master initializes workers at the beginning with everything that stays constant or/and is time consuming
Sending static data
loading libraries
eValuating global function
clusterCall
Evaluates the same function with the same arguments on all nodes.
cl <- makeCluster(2)
clusterCall(cl, function() library(dplyr))
## [[1]]
## [1] "dplyr" "stats" "graphics" "grDevices" "utils" "datasets"
## [7] "methods" "base"
##
## [[2]]
## [1] "dplyr" "stats" "graphics" "grDevices" "utils" "datasets"
## [7] "methods" "base"
stopCluster(cl)
clusterEvalQ
evaluates a literal expression on all nodes, which is a parallel equivalent of evalq()
function
cl <- makeCluster(2)
clusterEvalQ(cl, {
library(dplyr)
df <- tibble()
} )
## [[1]]
## data frame with 0 columns and 0 rows
##
## [[2]]
## data frame with 0 columns and 0 rows
stopCluster(cl)
clusterExport
Exports given objects from master to workers
expr <- "hi!"
cl <- makeCluster(2)
clusterExport(cl, "expr")
stopCluster(cl)
cl <- makeCluster(2)
# From previous step
myrdnorm <- function(n) {
rdnorm(n, mean = mean, sd = sd)
}
# Set number of numbers to generate
n <- rep(1000, 20)
# Run an expression on each worker
clusterEvalQ(
cl, {
# Load extraDistr
library(extraDistr)
# Set mean to 10
mean <- 10
# Set sd to 5
sd <- 5
})
## [[1]]
## [1] 5
##
## [[2]]
## [1] 5
# Run myrdnorm in parallel
res <- clusterApply(cl, n, myrdnorm)
# Plot the results
plot(table(unlist(res)))
stopCluster(cl)
cl <- makeCluster(2)
# Set global objects on master: mean to 20, sd to 10
mean <- 20
sd <- 10
# Load extraDistr on workers
clusterEvalQ(cl, library(extraDistr))
## [[1]]
## [1] "extraDistr" "stats" "graphics" "grDevices" "utils"
## [6] "datasets" "methods" "base"
##
## [[2]]
## [1] "extraDistr" "stats" "graphics" "grDevices" "utils"
## [6] "datasets" "methods" "base"
# Export global objects to workers
clusterExport(cl, c("mean", "sd"))
# Run myrdnorm in parallel
res <- clusterApply(cl, n, myrdnorm)
# Plot the results
plot(table(unlist(res)))
stopCluster(cl)
foreach(n = rep(5, 3), m = 10^(0:2), .combine = rbind) %do% rnorm(n, mean = m)
## [,1] [,2] [,3] [,4] [,5]
## result.1 1.244865 1.787582 -0.8541951 2.159011 2.049638
## result.2 9.430163 9.357314 11.7989832 9.627906 12.259813
## result.3 100.986793 99.792234 100.0548519 99.511260 98.769947
# Combine by sum
foreach(n = rep(5, 3), m = 10^(0:2), .combine = '+') %do% rnorm(n, mean = m)
## [1] 110.2554 111.1479 111.2121 108.9436 109.4016
foreach(x = sample(1:1000, 10), .combine = c) %:%
when(x %% 3 == 0 || x %% 5 == 0) %do% x
## [1] 774 300 168 81 849
doParallel
foreach
and parallel
registerDoParallel
library(doParallel)
## Warning: package 'doParallel' was built under R version 4.0.3
## Loading required package: iterators
## Warning: package 'iterators' was built under R version 4.0.3
cl <- makeCluster(3)
registerDoParallel(cl)
foreach(n = rep(5, 3)) %dopar% rnorm(n)
## [[1]]
## [1] -2.2044012 0.1888616 -0.3969126 -0.7311423 -0.6701238
##
## [[2]]
## [1] 0.06700566 -1.20334937 1.58754833 -2.26882817 -1.29466430
##
## [[3]]
## [1] 1.7145588 0.8485924 -1.0096851 -1.4032291 0.9140271
doFuture
library(doFuture)
## Warning: package 'doFuture' was built under R version 4.0.3
plan(cluster, workers=3) # Or multicore
foreach(n = rep(5, 3)) %dopar% rnorm(n)
## [[1]]
## [1] -1.0231632 -0.1587720 1.0977022 -0.2206505 0.8896213
##
## [[2]]
## [1] -0.8562071 -0.8623313 3.1194712 -1.3282737 -0.1906476
##
## [[3]]
## [1] 0.5777461 -1.0121746 0.2986670 -0.8979932 0.7426261
# Function for doParallel foreach
freq_doPar <- function(cores, min_length = 5) {
# Register a cluster of size cores
registerDoParallel(cores = cores)
# foreach loop
foreach(let = chars, .combine = c,
.export = c("max_frequency", "select_words", "words"),
.packages = c("janeaustenr", "stringr")) %dopar%
max_frequency(let, words = words, min_length = min_length)
}
# Run on 2 cores
freq_doPar(2)
future * Uniform way to evaluate R expressions asynchronously * Provide a unified API for sequential and parallel processing of R expressions * Processing via a construct called future
* An abstraction for value that may be available at some point in the future
Instead of waiting for the results in a specific order, it accets results from the first worker that finished and immediatly sends a new tasks to that worker
# Create a cluster
cl <- makeCluster(2)
# Check RNGkind on workers
clusterCall(cl, RNGkind)
## [[1]]
## [1] "Mersenne-Twister" "Inversion" "Rejection"
##
## [[2]]
## [1] "Mersenne-Twister" "Inversion" "Rejection"
# Set the RNG seed on workers
clusterSetRNGStream(cl, 100)
# Check RNGkind on workers
clusterCall(cl, RNGkind)
## [[1]]
## [1] "L'Ecuyer-CMRG" "Inversion" "Rejection"
##
## [[2]]
## [1] "L'Ecuyer-CMRG" "Inversion" "Rejection"