Install and load supporting libraries. We will load as we go.
## nodename
## "DZ2626UTPURUCKE"
## [1] "list of loaded packages: "
## [1] "stats" "graphics" "grDevices" "utils" "datasets" "methods"
## [7] "base"
amdahl_calc <- function(p, Ncpus, singlecoreT){
vec <- vector(mode="double", length = Ncpus)
for(i in 1:Ncpus){vec[i] = singlecoreT * (p + (1-p)/i)}
return(vec)
}
amdahl_calc(.01,64,100)
## [1] 100.000000 50.500000 34.000000 25.750000 20.800000 17.500000
## [7] 15.142857 13.375000 12.000000 10.900000 10.000000 9.250000
## [13] 8.615385 8.071429 7.600000 7.187500 6.823529 6.500000
## [19] 6.210526 5.950000 5.714286 5.500000 5.304348 5.125000
## [25] 4.960000 4.807692 4.666667 4.535714 4.413793 4.300000
## [31] 4.193548 4.093750 4.000000 3.911765 3.828571 3.750000
## [37] 3.675676 3.605263 3.538462 3.475000 3.414634 3.357143
## [43] 3.302326 3.250000 3.200000 3.152174 3.106383 3.062500
## [49] 3.020408 2.980000 2.941176 2.903846 2.867925 2.833333
## [55] 2.800000 2.767857 2.736842 2.706897 2.677966 2.650000
## [61] 2.622951 2.596774 2.571429 2.546875
Create figure for Amdahls calculations versus number of CPUs.
Ncpus = 64
cpus <- 1:Ncpus
mat_cpus <- rbind(cpus, cpus, cpus, cpus)
amdahl <- matrix(data = NA, nrow = 4, ncol = Ncpus)
singlecoreT <- 100
serialP <- c(0.01,0.2,0.5,0.9)
for(i in 1:length(serialP)){amdahl[i,cpus]<- amdahl_calc(serialP[i],Ncpus,singlecoreT)}
plot(mat_cpus, amdahl, col=c("red","blue", "green", "black"), main="Amdahls' Law", sub="serialP + (1-serialP)/cpus", ylim=c(0,100))
Profiling, lineprof now deprecated in favor of profvis, also not on CRAN yet.
The library rbenchmark will run a process multiple times to get distribution statistics on execution time.
library(rbenchmark)
benchmark(runif(100000),replications = 1)
## test replications elapsed relative user.self sys.self user.child
## 1 runif(1e+05) 1 0.01 1 0 0.02 NA
## sys.child
## 1 NA
benchmark(runif(100000),replications = 1)
## test replications elapsed relative user.self sys.self user.child
## 1 runif(1e+05) 1 0 NA 0 0 NA
## sys.child
## 1 NA
benchmark(runif(100000),replications = 1)
## test replications elapsed relative user.self sys.self user.child
## 1 runif(1e+05) 1 0 NA 0 0 NA
## sys.child
## 1 NA
benchmark(runif(100000),replications = 10)
## test replications elapsed relative user.self sys.self user.child
## 1 runif(1e+05) 10 0.04 1 0.04 0 NA
## sys.child
## 1 NA
benchmark(runif(100000),replications = 1000)
## test replications elapsed relative user.self sys.self user.child
## 1 runif(1e+05) 1000 4.71 1 4.51 0.2 NA
## sys.child
## 1 NA
The parallel package ships with R and does not need to be installed. parallel integrates the older (but still maintained) snow and multicore libraries. multicore uses a fork process not available on Windows.
library(parallel)
The parallel method detectCores will try to estimate the number of cores available (logical or physical). Logical cores (hyperthreaded) is the product of physical cores and number of threads per core. OS-dependent. Almost all physical CPUs will have 2 or more physical cores. Your mileage may vary with logical cores.
Ncoreslogical <- detectCores(logical = TRUE)
Ncoreslogical #Hyperthreaded
## [1] 32
Ncores <- detectCores(logical = FALSE)
Ncores
## [1] 16
The easiest approach is using the number of physical cores to create the number of worker processes. Then split your task into chunks equal to the number of workers that are roughly the same size. We implement callable function that will sleep for a given amount of time.
chill <- function(i){
function(x) Sys.sleep(i)
}
Then call it in a serial/non-parallel manner with lapply. system.time() reports user time (execution of functions), system time (user time plus memory overhead, disk access), and elapsed time (wall clock).
#serial
system.time(lapply(1:10, chill(1)))
## user system elapsed
## 0 0 10
The parallel package methods mclapply (mac/linux) and parLapply (windows) are straight-up replacements for apply functions.
#parallel
if(Sys.info()['sysname'] != "Windows"){
#mac/linux - uses a fork process that windows does not have
system.time(mclapply(1:10, chill(1), mc.cores = Ncores))
}else{
#windows
cluster <- makePSOCKcluster(Ncores)
system.time(parLapply(cluster, 1:10, function(i) Sys.sleep(2)))
stopCluster(cluster)
}
However, parallel applications can introduce overhead that can actually slow down the calculations. For example, functions that are already vectorized like finding the square root of all the numbers in a vector.
#serial
system.time({results <- lapply(1:100000, sqrt)})
## user system elapsed
## 0.06 0.00 0.06
#parallel
if(Sys.info()['sysname'] != "Windows"){
system.time({results <- mclapply(1:100000, sqrt, mc.cores = Ncores)})
}else{
cluster <- makePSOCKcluster(Ncores)
system.time(parLapply(cluster, 1:100000, sqrt))
stopCluster(cluster)
}
mclapply is great if your code fits into the apply framework, but sometimes your program is such that you may just want to work with loops. foreach is another parallel R library that comes with base R.
require("foreach")
## Loading required package: foreach
The doParallel package is an interface between the foreach package and the parallel package. In order to use foreach you need to register a back end that controls how the loop gets split up amongst the different available cores. For that, you need to either register your cores with doMC (mac/linux) or with doParallel (windows).
library(foreach)
if(Sys.info()['sysname'] != "Windows"){
require("doMC")
registerDoMC(Ncores)
}else{
require("doParallel")
cl <- makeCluster(2)
registerDoParallel(cl)
#snow is also an option
}
## Loading required package: doParallel
## Loading required package: iterators
Using foreach to building linear models from the iris data set with sampling in a for loop.
#View(iris)
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
trials <- 10000
print("parallel time =")
## [1] "parallel time ="
system.time({
r <- foreach(icount(trials), .combine=cbind) %dopar% {
ind <- sample(100, 100, replace = TRUE)
result1 <- glm(x[ind,2]~x[ind,1],family=binomial(logit))
coefficients(result1)
}
})
## user system elapsed
## 3.96 1.00 18.10
Note the use of %do% instead of %dopar% to run in a serial manner.
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
trials <- 10000
print("serial time =")
## [1] "serial time ="
system.time({
r <- foreach(icount(trials), .combine=cbind) %do% {
ind <- sample(100, 100, replace = TRUE)
result1 <- glm(x[ind,2]~x[ind,1],family=binomial(logit))
coefficients(result1)
}
})
## user system elapsed
## 37.19 0.01 37.22
Be careful about modifying shared vectors, foreach modifies copies of the original vector and therefore does not return what you might expect.
#serial
x_serial <- c(0,0,0,0,0)
for(i in 1:5){
x_serial[i] = i*2
}
x_serial
## [1] 2 4 6 8 10
#parallel
x_parallel <- c(0,0,0,0,0)
foreach(i = 1:5) %dopar%{
x_parallel[i] = i * 2
}
## [[1]]
## [1] 2
##
## [[2]]
## [1] 4
##
## [[3]]
## [1] 6
##
## [[4]]
## [1] 8
##
## [[5]]
## [1] 10
x_parallel
## [1] 0 0 0 0 0
For this reason, foreach has a combine function can return the reconstituted vector. The array assignment is moved outside of the foreach loop.
#parallel foreach with combine
x_cparallel <- c(0,0,0,0,0)
x_cparallel <- foreach(i = 1:5, .combine=c) %dopar%{
i * 2
}
x_cparallel
## [1] 2 4 6 8 10
Prescheduling is another argument that can result in big speed increases for parallelized foreach loops. We return to our square root of 10000 integers problem, which was actually faster to run in the serial manner than with using mclapply because the serial execution is vectorized and the parallel implemenation adds some overhead to the calculations.
#serial
system.time({results <- lapply(1:10000, sqrt)})
## user system elapsed
## 0 0 0
#parallel
if(Sys.info()['sysname'] != "Windows"){
system.time({results <- mclapply(1:10000, sqrt, mc.cores = Ncores)})
}else{
cluster <- makePSOCKcluster(Ncores)
system.time(parLapply(cluster, 1:10000, sqrt))
stopCluster(cluster)
}
Adding a prescheduling argument to the foreach loop adds a big reduction in the overhead associated with the parallel calculation. First, using foreach with no prescheduling:
system.time({
result <- foreach(x = 1:10000, .options.multicore=list(preschedule=FALSE)) %dopar% {
sqrt(x)
}
})
## user system elapsed
## 4.61 0.51 5.13
But setting prescheduling to TRUE can allow for nearly an order of magnitude improvement:
system.time({
result <- foreach(x = 1:10000, .options.multicore=list(preschedule=TRUE)) %dopar% {
sqrt(x)
}
})
## user system elapsed
## 4.40 0.52 4.92
However, both of these approaches are much slower than the simple vectorized version of the sqrt function. Prescheduling can still be slower if the variability in the individual jobs is high. For these situations, you might want to look at the use of load balancing and queuing servers. The simplest way to do this is to increase the number of jobs relative to the worker then queue then up so that as a worker finishes a task it gets the next one in line.
Load wine dataset.
#tom epa windows dell precision t7610
if(Sys.info()[4]=="DZ2626UTPURUCKE"){
wine <- read.csv("k:/git/parallelR/winequality-red.csv", sep=";", header=TRUE)
}
#tom laptop
if(Sys.info()[4]=="stp-air.local"){
vpdir<-path.expand("~/git/beeRpop/")
}
#View(head(wine))
str(wine)
## 'data.frame': 1599 obs. of 12 variables:
## $ fixed.acidity : num 7.4 7.8 7.8 11.2 7.4 7.4 7.9 7.3 7.8 7.5 ...
## $ volatile.acidity : num 0.7 0.88 0.76 0.28 0.7 0.66 0.6 0.65 0.58 0.5 ...
## $ citric.acid : num 0 0 0.04 0.56 0 0 0.06 0 0.02 0.36 ...
## $ residual.sugar : num 1.9 2.6 2.3 1.9 1.9 1.8 1.6 1.2 2 6.1 ...
## $ chlorides : num 0.076 0.098 0.092 0.075 0.076 0.075 0.069 0.065 0.073 0.071 ...
## $ free.sulfur.dioxide : num 11 25 15 17 11 13 15 15 9 17 ...
## $ total.sulfur.dioxide: num 34 67 54 60 34 40 59 21 18 102 ...
## $ density : num 0.998 0.997 0.997 0.998 0.998 ...
## $ pH : num 3.51 3.2 3.26 3.16 3.51 3.51 3.3 3.39 3.36 3.35 ...
## $ sulphates : num 0.56 0.68 0.65 0.58 0.56 0.56 0.46 0.47 0.57 0.8 ...
## $ alcohol : num 9.4 9.8 9.8 9.8 9.4 9.4 9.4 10 9.5 10.5 ...
## $ quality : int 5 5 5 6 5 5 5 7 7 5 ...
x <- wine[,1:11]
y <- wine$quality
Random forests.
#install.packages("randomForest")
library(randomForest)
## randomForest 4.6-12
## Type rfNews() to see new features/changes/bug fixes.
Ntrees = 500
system.time({
randomForest(y=y,x=x,ntree=Ntrees)
})
## user system elapsed
## 3.72 0.04 3.74
With the foreach package.
trees_per_core = floor(Ntrees / Ncores)
system.time({
wine_model <- foreach(trees=rep(trees_per_core, Ncores), .combine=combine,
.multicombine=TRUE, .packages='randomForest') %dopar% {randomForest(
y = y, x = x, ntree = Ntrees)
}
})
## user system elapsed
## 2.27 0.70 30.89
#library("devtools"); install_github("lme4/lme4",dependencies=TRUE) # for caret
library(caret)
## Loading required package: lattice
## Loading required package: ggplot2
library(mlbench)
library(e1071)
data(Sonar)
inTrain <- createDataPartition(y = Sonar$Class, p = .75, list = FALSE)
training <- Sonar[ inTrain,]
testing <- Sonar[-inTrain,]
ctrl <- trainControl(method = "repeatedcv", number = 8, repeats = 8)
grid_rf <- expand.grid(.mtry = c(2, 3, 4))
system.time({
rf <- train(Class ~ ., data = training, method = "rf", trControl = ctrl, ntree=750, tuneGrid = grid_rf)
})
## user system elapsed
## 1.25 0.14 37.68
We have to reregister with only 1 core to see how this model performs in a serialized manner.
if(Sys.info()['sysname'] != "Windows"){
require("doMC")
registerDoMC(1)
}else{
require("doParallel")
cl <- makeCluster(2) #problem
registerDoParallel(cl)
#snow is also an option
}
system.time({
rf <- train(Class ~ ., data = training, method = "rf", trControl = ctrl, ntree=750, tuneGrid = grid_rf)
})
## Warning: closing unused connection 6 (<-DZ2626UTPURUCKE.aa.ad.epa.gov:
## 11993)
## Warning: closing unused connection 5 (<-DZ2626UTPURUCKE.aa.ad.epa.gov:
## 11993)
## user system elapsed
## 1.32 0.10 38.68
#reset
if(Sys.info()['sysname'] != "Windows"){
require("doMC")
registerDoMC(Ncores)
}else{
require("doParallel")
cl <- makeCluster(2)
registerDoParallel(cl)
#snow is also an option
}
library(ParallelForest)
Be careful about random number generation.
On Mac run activity monitor and double click on cpu load to get a graphic of core activity.