在雲端運算環境使用R和MPI

前言

最近大數據成為顯學,人人都在談論大數據,而且大部分的人都在運用Hadoop來處理大數據。

但是如果要做機器學習,或是模型配適時,常常需要在資料上執行疊代(iteration)演算法。由於Hadoop是在硬碟上運作的系統,所以在疊代演算法的表現並不好。如果能夠將資料全部裝進記憶體中,那可以大大增加運算效能。

如何獲取足夠的記憶體

原本記憶體很貴,所以一般人無法把太大的數據放入記憶體中分析。五年前想要對容量高達數GB的資料做機器學習或模型配適,對於一般人來說太昂貴了。但是現在要將GB等級,甚至是TB等級的數據放在記憶體做分析,所需要的費用已是可負擔的。

由於硬體的進步,現在要使用大記憶體的機器已經相對簡單:一種方法是花數十萬買一台大記憶體的電腦。但是購買電腦的問題在於,沒辦法先試用。如果花了數十萬才發現買來的電腦還是不符合需求,是不是有點尷尬呢?

另外一種方法就是租用機器。今日已經有許多服務可以租用機器了,而最有名的就是Amazon Web Service。目前AWS已提供我們租用大記憶體的機器了(AWS Instance Type),搭配上AWS的Spot Instance服務,目前一台68.4G記憶體的電腦租用下來只需約0.5美元/小時。

如果要使用1TB等級的記憶體,我們可以把便宜的機器串起來,來增加記憶體。但是這樣付出的代價就是要撰寫比較複雜的程式,以及要承擔網路延遲。MPI(Message Passing Interface)是一個比Hadoop歷史更悠久的計算協定。透過MPI的實作,如OpenMPI,我們可以把若干台電腦串起來,協同工作,解決大數據的問題。MPI是除了Hadoop之外,另一種可以撰寫MapReduce式演算法的工具。

MPI 仍然是一種可用的解決方案

過去MPI主要是用於解決程序間通訊問題,而且許多MPI的程式要用C來撰寫,導致開發效率 低落。現在MPI可以在許多更高階的語言中使用,例如R就有套件讓開發者在R中使用MPI,以提升開發效率。

MPI另一個讓人卻步的因素是設定繁瑣,而且需要環境一致的運算環境。今日由於雲端運算的進步,我們可以方便的在各式供應商,如AWS(Amazon Web Service)上租用數十台,甚至是數百台環境一模一樣的虛擬機器。因此這部分的問題在雲端環境上已容易解決。

Hadoop社群現在正在發展在記憶體上做運算的系統,如:Spark。但是這些工具仍然在開發中,而且overhead不小,需要使用非常大量的電腦才會有好的效能。我相信很多讀者手上數據不小,但是也沒有大到要用Hadoop。那麼,本篇文章介紹的MPI + R 的工具可能就是讀者所需要的。

R, 最火紅的分析工具

R 則是目前火紅的Open Source分析工具。根據如KDnuggets的調查,R 也是資料科學家(Data Science)中最熱門的分析工具。R 可以提供和matlab同等級的功能,並且已經有破5000個第三方套件來擴充它的分析演算法。

在這篇文章中,筆者將介紹如何在R利用MPI和AWS,並且Demo一個以22個Instance,在數分鐘內對上億筆資料做羅吉斯迴歸(Logistic Regression)的例子。

然而,當資料量大到需要數百台電腦來作處理時,筆者相信Hadoop仍然是比較好的解決方案。但是當資料量超過一台電腦,且未達數百台電腦才能處理的規模,我們可以考慮使用本文提及的解決方案。如果資料沒這麼大,企業是不必用Hadoop的(Don't use Hadoop - your data isn't that big)。 (譯文)

普遍來說,數據在每次處理後都是會更精粹,量也會變小。本篇的方法並不適用於處理初始資料,而是有前處理過後的資料。以筆者的經驗,資料經過前處理後應當可以降低十倍甚至百倍的空間。

R 和 MPI

在過去,R就已經能運用MPI做分散式運算了。最常見的底層套件就是Rmpi。基於Rmpi之上的還有snowforeach等比較高階的運算套件。

pbdR: Programming with Big Data in R是2012年的專案,它以MPI為底層,並且設計成讓使用者以SPMD架構來撰寫分散式演算法(pbdMPI)。同時他也提供好用的資料結構(如pbdDMAT分散式矩陣和向量)和測量運算時間(pbdPROF)等工具。

以筆者的經驗,在雲端運算環境之下,pbdMPI可能是R中最好的工具。若讀者有任何指教,非常歡迎來信建議

更完整的介紹,請參考High-Performance and Parallel Computing with R

在AWS架設MPI 環境

這裡筆者將簡單介紹如何快速的在AWS預設的ubuntu 12.04 instance上架設MPI環境。

使用Public AMI

筆者有在Tokyo釋出AMI(ami-4f543b4e),想要快速嘗試的讀者可以直接用AMI來建立虛擬機器,這樣就可以省略許多設定步驟。快速的設定方法請參考影片:

想要學習架設環境的讀者可以參考附錄中的說明。

Logistic Regression簡介

Logistic Regression 是一種supervised learning的方法,也是迴歸分析用於類別形變數的一種變形,目前在許多領域上有許多應用。例如在雅虎挑戰Google龍頭地位的新秘密武器:柏克萊博士開發比Hadoop更強的新系統Spark中就有提到logistic regression被用來從數據中找到模式。筆者限於能力,只能簡單地介紹Logistic Regression,有興趣的讀者可以參考這篇很棒的文章:Maximum Likelihood, Logistic Regression and Stochastic Gradient Training

簡單來說,Logistic Regression是尋找\( y_i \)發生TRUE或FALSE機率和\( x_i \)之間的關係。為了避免發生機率值小於0或大於1的不合理現象,所以會再用logit函數把機率值和整條實數線做連結。

plot of chunk logit

一種Logistic Regression的表示式為:

\[ P(y_i = \pm 1 | x_i, w) = \frac{1}{1 + e^{- y_i (w^T x_i) }} \]

透過Maximum Likelihood和$L_2$Regularization,我們可以算出要做最佳化的目標函數\( f \)、Gradient \( \nabla f \)和Hassian \( \nabla^2 f \)。

計算上,我們要找到能夠最小化\( f \)的\( w \)。

最佳化

使用者也可以挑選任何計算最佳化的函式庫,要注意的是若參數很多(\( w \)很長)的時候,函式庫建議要避開直接計算\( \nabla^2 f(w) \)這個矩陣的方法。筆者不是最佳化的專家,找了幾個R內建的方式都跑不完,只好抽出LIBLINEAR中的trusted region演算法來用囉。抽出來的核心已經包成一個R套件,請讀者在R中安裝Rcpp之後在shell執行:

wget https://bitbucket.org/wush978/largescalelogisticregression/get/hstrust.zip
unzip hstrust.zip
R CMD INSTALL wush978-largescalelogisticregression-4daf9c5bba5c

應該就可以安裝了。

Data and Scaling

這裡以有Big Data的iris之稱的airline資料為例。這裡有自1987至2008年的飛行資料。

假設我們要研究飛機有無Delay和起飛的機場有無關係,就可以運用剛剛介紹的Logistic Regression來做分析。又為了要試試看剛剛介紹的pbdMPI,我們就直接依照上述的介紹,在AWS上開22台電腦,每台電腦來處理一年的資料,來試跑吧!

ps. 整趟試跑可能會花費你個位數美元左右的金額。

Download Data

首先我們先來下載airline的Data吧。為了嘗鮮,就讓我們用pbdMPI來下載。

pbdMPI所採用的SPMD模型,就是要讓22台機器都來執行同樣的程式碼檔案。這也是為什麼Master node要用samba來開網路共享,這樣我們才能在master上編輯script後,能夠自動讓所有的node看到。

如果使用筆者提供的AMI,則相關檔案都已經放在~/pbdMPI-Demo之下了。

(~/pbdMPI-Demo/download.R)

library(pbdMPI)

init()

url <- sprintf("http://stat-computing.org/dataexpo/2009/%d.csv.bz2", (1987:2008)[comm.rank() + 1])
download.file(url, "~/data.bz2")

finalize()

這裡init就是要啓動底層mpi的communicators, 而finalize則是要終止mpi communicators,請記得在退出R程序之前finalize,否則openmpi會直接強制執行所有串起來的R程序的。

中間運用到的comm.rank會依照機器在~/pbdMPI.conf的順序,從0開始回報R程序的序號。運用這個序號,就可以讓22台機器各自下載對應的airline dataset。

將檔案儲存好之後(記得放在Repository資料夾之後,才能讓所有電腦看到,或是使用~/pbdMPI-Demo內的檔案),執行:

mpirun -np 22 --hostfile ~/pbdMPI.conf Rscript xxx.R

22台電腦就會執行上面的程式碼,大家一起init,一起進入邏輯判斷if(comm.rank() != 0)。而且每台電腦抓取的資料都不同,達到分工合作的效果。最後再大家一起finalize。這裡,22個R都會等到大家都執行到這之後,再各自離開程序。

這就是一個最簡單的SPMD的應用範例。

Rds

由於下載下來的格式是.bz的壓縮檔案,我們先將格式轉成R原生的儲存格式以加快後續載入資料的速度:

(~/pbdMPI-Demo/bz2rds.R)

suppressPackageStartupMessages(library(pbdMPI))

init()

data.src <- bzfile("~/data.bz2")
airline <- read.csv(data.src, stringsAsFactors = FALSE)
saveRDS(airline, "~/data.Rds")
barrier()
comm.print("Finish!")
finalize()

剛剛是不是覺得套件載入訊息很洗螢幕呢?我們可以用suppressPackageStartupMessages來隱藏載入訊息。

這裡我又多使用了barriercomm.print兩個函數。barrier是一個作同步的函數,它會確保22個R instances都執行到這一行之後,才會再往下執行。

comm.print是一個很方便的函數,只會讓特定的node(預設是rank 0)印訊息到stdout,畫面才不會像之前的範例這麼鬧哄哄的。

這個函數很消耗時間,所以大家可以想像如果只用一台電腦做這些事情,要等多久。

Count Instances

接著讓我們來看一下每年各有有多少筆資料吧!

(~/pbdMPI-Demo/count.R)

suppressPackageStartupMessages(library(pbdMPI))
init()

airline <- readRDS("~/data.Rds")
n <- gather(nrow(airline))
comm.print(n)

finalize()

大家應該已經熟悉SPMD的模式了,也可以看懂前面就是讓所有node讀取好不容易存好的Rds檔案。

gather函數會將指定的物件全部依序集中到一個特定的node,預設是rank 0。所以我們可以看到最後n就成為一個integer vector,依序代表自1987年到2008年的資料個數。

gather這類函數就是MPI的賣點。MPI提供了許多API供Process之間傳遞訊息,讓程式設計師可以寫出平行化的程式。

另外這裡master的gather(0L, ...中的0L是為了保持通訊的效能。因為nrow(airport)是整數,0L會讓所有人傳遞的訊息是一致的,而不用做轉型。gather也可以指定buffer,也就是被傳遞的物件暫時儲存的地方,這可以加快pbdMPI通訊的速度。然而使用buffer也要很小心,如果傳遞的物件形態不一致時:

buffer <- integer(23)
if (comm.rank() != 0) {
  n <- gather(norw(iris), b)
} else {
  n <- gather(0.0, b)
}

就準備看C API的錯誤訊息吧!!

Error in gather(0, b) : 
  REAL() can only be applied to a 'numeric', not a 'integer'
Calls: gather -> gather -> .Call
Execution halted

pbdMPI除了提供方便的API(會自動處理形態問題)給一般的R使用者之外,也提供讓熟悉R底層物件資料結構的使用者,寫出進階語法的空間。

所以最終呢,我們應該會看到:

COMM.RANK = 0
[[1]]
[1] 1311826

[[2]]
[1] 5202096

[[3]]
[1] 5041200

...

gather預設是把東西裝到list之中,使用者想要直接壓成如integer vector的話,可以加上unlist = TRUE這個參數。

Sum Instances

接著讓我們來計算22年來總共有多少筆資料。什麼?直接把剛剛印出來的結果加總?那不有趣啊,讓我們用22台電腦一起算比較熱鬧,也可以趁機再學一個函數。

(~/pbdMPI-Demo/sum.R)

suppressPackageStartupMessages(library(pbdMPI))
init()

airline <- readRDS("~/data.Rds")
n <- reduce(nrow(airline), op="sum")
comm.print(n)

finalize()

這裡要介紹的是reduce,當所有R instance執行reduce之後,大家就會將第一個參數物件匯整到master,master再依照op的指示來整理匯整後的資料。也就是說,reducegather多了一個動作,而這個動作可以透過op來控制。

所以這裡的程式碼讓每個slaves去數自己手上資料有幾筆,然後回報給master後,master再做加總。

結果應該是:

COMM.RANK = 0
[1] 123534969

Training

接著讓我們來分析一下,到底飛機誤點和起飛的機場有沒有關係呢?

我們先丟著執行,再來講解。

(~/pbdMPI-Demo/training.R)

#! /usr/bin/Rscript

# __author__ = "Wush Wu"
# __copyright__ = "© 2013, Wush Wu"
# __license__ = "GPL 3.0"

suppressPackageStartupMessages(library(pbdMPI))
suppressPackageStartupMessages(library(Matrix))

start.time.all <- start.time <- Sys.time()

init()

data <- readRDS("~/data.Rds")
print(sprintf("nrow(data): %d (from %d)", nrow(data), (1987:2008)[comm.rank()]))
barrier()
comm.print(sprintf("Loading time: %0.2f secs", difftime(Sys.time(), start.time, units="secs")))
start.time <- Sys.time()

origin.airport <- unique(data$Origin)
origin.airport <- allgather(origin.airport, unlist=TRUE)
origin.airport <- unique(origin.airport)
comm.print(origin.airport)
invisible(gc())
comm.print("encoding training data...")
train.data <- data.frame(
  y = (data$ArrDelay > 60),
  origin = factor(data$Origin, levels=origin.airport)
)
rm(data)
invisible(gc())
comm.print(sprintf("Encoding time: %0.2f secs", difftime(Sys.time(), start.time, units="secs")))

start.time <- Sys.time()
comm.print("constructing model matrix...")
X <- sparse.model.matrix(y ~ origin, train.data)
y <- train.data$y[!is.na(train.data$y)]
X.size <- reduce(object.size(X), op="sum")
y.size <- reduce(object.size(y), op="sum")
comm.print(sprintf("Encoded data needs about %f GB", (y.size + X.size) / 2^30))
comm.print(sprintf("Constructing time: %0.2f secs", difftime(Sys.time(), start.time, units="secs")))
start.time <- Sys.time()

x.name <- colnames(X)
x.name.list <- allgather(x.name, unlist=FALSE)
stopifnot(
  all(sapply(1:length(x.name.list), function(i) all.equal(x.name.list[[1]], x.name.list[[i]])))
)
x.name <- x.name.list[[1]]

sigma <- function(x) {
  1/(1 + exp(-x))
}

mpi_fun <- function(w) {
    regularization <- sum(w^2)/2
    retval <- sum(log(1 + exp(ifelse(y, -1, 1) * as.vector(X %*% w))))
    retval <- allreduce(as.numeric(retval), op = "sum")
    retval + regularization
}

mpi_grad <- function(w) {
    regularization <- w
    y.value <- ifelse(y, 1, -1)
    x <- y.value * as.vector(X %*% w)
    d <<- sigma(x) * (1 - sigma(x))
    retval <- as.vector(((sigma(x) - 1) * y.value) %*% X)
    retval <- allreduce(retval, op="sum")
    retval + regularization
}

mpi_Hs <- function(unused, w) {
    regularization <- w
    retval <- as.vector(t(d * (X %*% w)) %*% X)
    retval <- allreduce(retval, op="sum")
    retval + w
}

w.size <- ncol(X)
stopifnot(require(HsTrust))
obj <- new(HsTrust, mpi_fun, mpi_grad, mpi_Hs, length(x.name))
r <- obj$tron(0.001, TRUE)

comm.print(sprintf("Training time: %0.2f secs", difftime(Sys.time(), start.time, units="secs")))
total.time <- as.numeric(difftime(Sys.time(), start.time.all, units="secs"))
comm.print(sprintf("Total time: %0.2f", total.time))

save(r, x.name, file="~/r.Rdata")
finalize()

讀者也可以先只開兩個node:

mpirun -np 2 --hostfile ~/pbdMPI.conf Rscript xxx.R

這樣可以先只針對1987年的資料很快的跑過一遍,可以快速的檢查有沒有bug。

上述的程式碼做了以下的事情:

整合所有機場

由於各年所看到的機場不一定一致,而不一致會導致logistic regression的結果不具有意義,所以我們要先整和這部分。好在R內建有許多好用的工具,如uniquefactor可以幫助我們對資料作編碼。

編碼的意思就是,把各個機場:

  [1] "SAN" "SFO" "BUR" "OAK" "LAX" "PHX" "SJC" "LAS" "SNA" "SMF" "ABQ" "MFR"
...

依序對應到1, 2, 3, … 等整數。這個對應關係,務必要所有電腦都一致,所以才會有:

Model Matrix 和 formula

由於是類別形變數,所以我們使用Sparse Matrix來做Model matrix以節省記憶體。依據編碼結果,可以透過sparse.model.matrix來產生的model matrix:

y ~ origin是R在做regression-like modeling的時候常用於表示數學式關係的R物件,又叫做formula。這裡y ~ origin的意思就是:

\[ y = a + b_{origin} * origin + \varepsilon \]

光有關係還不夠,還要給資料來源,那就是第二個參數:train.data。類似的語法可以在lmglmmodel.matrix,甚至是plotaggregate等函數都可以用formula。這是非常方便的一個工具。

X <- sparse.model.matrix(y ~ origin, train.data)

保險起見,檢查大家編碼後的結果是否一致:

x.name <- colnames(X)
x.name.list <- allgather(x.name, unlist=FALSE)
stopifnot(
  all(sapply(1:length(x.name.list), function(i) all.equal(x.name.list[[1]], x.name.list[[i]])))
)
x.name <- x.name.list[[1]]

筆者是用colnames(X)來檢驗,應該每台電腦上算出來的都要一致。stopifnot是一個常用的檢驗函數,只要裡面的參數有FALSE,R就會大叫錯誤,然後直接關掉。因為沒有呼叫finalize的關係,openmpi會毫不留情的把其他所有R程序,全部殺光!

定義mpi_fun, mpi_gradmpi_Hs

來到重頭戲了。

讓我們來復習一下剛剛寫的數學式子:

但是現在呢,所有的資料,也就是\( X = \left[\begin{array}{c}x_1^T \\ x_2^T \\ \vdots \\ x_n^T \end{array}\right] \),已經被讀者們無情的拆成22份了:\( X = \left[\begin{array}{c}X_1^T \\ X_2^T \\ \vdots \\ X_22^T \end{array}\right] \),\( y \)也同時被分屍了。

那每台電腦要如何各自處理手上的資料,以及最後master怎麼整合大家各自算出來的結果呢?

\( f \)

其實\( f \)就是\( x_i \), \( y_i \)各自算,再加總,所以很直接:

mpi_fun <- function(w) {
    regularization <- sum(w^2)/2
    retval <- sum(log(1 + exp(ifelse(y, -1, 1) * as.vector(X %*% w))))
    retval <- allreduce(as.numeric(retval), op = "sum")
    retval + regularization
}

R在學術界受到歡迎的其中一個理由就是,可以用近似數學式子的方式算出結果,看看sum(log(1 + exp(ifelse(y, -1, 1) * as.vector(X %*% w)))),不知道讀者是否同意呢?

\( \nabla f \)

\( \nabla f: \mathbb{R}^p \rightarrow \mathbb{R}^p \)

所以這次大家要算的是和參數一樣長的向量了。仔細看看算式:

\( \nabla f(w) = w + \sum_{i=1}^n{(\frac{1}{1 + e^{-y_i w^T x_i}} - 1)y_i x_i} \)

其實也和\( f \)一樣,大家各自算出\( w_i \in \mathbb{R}^p \)之後,再各自交換就可以了。

mpi_grad <- function(w) {
    regularization <- w
    y.value <- ifelse(y, 1, -1)
    x <- y.value * as.vector(X %*% w)
    d <<- sigma(x) * (1 - sigma(x))
    retval <- as.vector(((sigma(x) - 1) * y.value) %*% X)
    retval <- allreduce(retval, op="sum")
    retval + regularization
}

值得一提的是,從LIBLINEAR的實作中的小撇步:d <<- sigma(x) * (1 - sigma(x))。這等講完Hessian後再解釋。

\( \nabla^2 f \)

Hessian其實是個 \( p \times p \) 的矩陣,當資料量大,而且feature也大的時候,這個Hessian通常會拖累效能。而LIBLINEAR的Trusted Region的實作,並不需要直接算出\( \nabla^2 f(w) \), 而是以\( \nabla^2 f(w)s \)取代。而且這裡的\( w \),一定和前面呼叫\( mpi_grad \)的\( w \)一致。

仔細看\( \nabla^2 \)的算式:

讀者有沒有注意到,唯一和\( w \)有關的就是\( D_{ii} \)?由於Trusted Region的實作的特性,所以我們把\( D_{ii} \)的計算移動到mpi_grad之中,也就是剛剛提到的:d <<- sigma(x) * (1 - sigma(x))

而這裡的拆解,就是慢慢把

\[ \left[X_1^T, X_2^T\right]D\left[\begin{array}{c}X_1 \\ X_2 \end{array}\right] s \]

展開!

最終我們會得到,每個電腦就只要計算\( X_i^T D_i X_i s \),最後再互相交換並加總起來就可以了。

所以就寫成:

mpi_Hs <- function(unused, w) {
    regularization <- w
    retval <- as.vector(t(d * (X %*% w)) %*% X)
    retval <- allreduce(retval, op="sum")
    retval + w
}

\( X_i^T D_i X_i s \)的計算就是t(d * (X %*% w)) %*% X,請讀者注意,這裡的w就是數學式中的\( s \)。

最佳化的流程

基本上,Trusted Region的部分,也就是算難的部分,筆者完全交給LIBLINEAR的核心去跑。也就是說,由這個核心決定什麼時候要算mpi_funmpi_gradmpi_Hs,而計算的參數w,也一切由它說了算。

所以整個流程就是:

而每個node就是聽從自己機器上的Optimization Kernel來呼叫函數:

  1. 依照自己的資料計算結果
  2. 運用allreduce彼此交換結果
  3. 總結出整體資料的計算結果

計算的過程中,讀者應該會看到:

iter  1 act 6.290e+05 pre 5.622e+05 delta 2.101e+00 f 8.930e+05 |g| 6.073e+05 CG   1
iter  2 act 6.752e+04 pre 5.666e+04 delta 2.101e+00 f 2.640e+05 |g| 1.312e+05 CG   1
...

這就是告訴讀者,LIBLINEAR的核心已經疊代了兩次,第一次的mpi_fun的值是\( 8.93 \times 10^5 \),mpi_grad的結果的向量長度是\( 6.073 \times 10^5 \),而Trusted Region算mpi_Hs算了1次(CG 後面的數字)以後才找到下一個w

結果:

以下是筆者過去用跑出來的結果:

[1] "nrow(data): 1311826 (from 1987)"
NULL
COMM.RANK = 0
[1] "Loading time: 6.10 secs"
COMM.RANK = 0
  [1] "SAN" "SFO" "BUR" "OAK" "LAX" "PHX" "SJC" "LAS" "SNA" "SMF" "ABQ" "MFR"
 [13] "SCK" "MRY" "TUS" "EUG" "SEA" "RDM" "PDX" "RNO" "ONT" "CCR" "FAT" "LGB"
 [25] "PSC" "YKM" "BLI" "GEG" "JFK" "STL" "HNL" "MIA" "SJU" "DEN" "CVG" "DCA"
 [37] "DTW" "SYR" "LGA" "BOS" "PHL" "TPA" "MCO" "MKE" "IAD" "CMH" "ORD" "PIT"
 [49] "EWR" "HOU" "SAT" "DAY" "IND" "FLL" "BNA" "CLE" "DFW" "BWI" "ORF" "COS"
 [61] "MCI" "LIT" "TUL" "BDL" "SLC" "SDF" "IAH" "JAX" "PSP" "ANC" "MSY" "OMA"
 [73] "RSW" "SRQ" "ICT" "ATL" "MDW" "AUS" "MSP" "PBI" "OKC" "MLI" "MSN" "CLT"
 [85] "DSM" "RDU" "FSD" "PIA" "SGF" "LEX" "CMI" "CID" "SUX" "TOL" "LNK" "MDT"
 [97] "ALO" "RST" "MEM" "OGG" "FAI" "KOA" "ROC" "MBS" "LIH" "SBA" "ALB" "GSO"
[109] "GRR" "BIL" "BHM" "CAE" "MHT" "ELP" "TYS" "JAN" "BFL" "HSV" "SAV" "BGR"
[121] "PWM" "ABE" "BOI" "CAK" "GTF" "BUF" "CPR" "BTV" "ISP" "RIC" "CHS" "PVD"
[133] "RAP" "CRW" "FAR" "HPN" "FOE" "ILM" "RDD" "LMT" "ACV" "ILG" "DAL" "LBB"
[145] "AMA" "CRP" "HRL" "MAF" "TLH" "GSP" "PNS" "MOB" "AVP" "GNV" "STT" "STX"
[157] "DAB" "MLB" "DRO" "GJT" "PUB" "GCN" "FLG" "YUM" "GRB" "AZO" "ERI" "FWA"
[169] "BIS" "MOT" "GFK" "VPS" "BZN" "DLH" "LSE" "EAU" "ATW" "SBN" "LAN" "MSO"
[181] "MGM" "BTR" "SHV" "CHA" "GPT" "PFN" "CWA" "ROA" "FAY" "AVL" "OAJ" "HTS"
[193] "TRI" "LYH" "MYR" "FNT" "AGS" "ORH" "CHO" "ISO" "EVV" "UCA" "APF" "EYW"
[205] "BGM" "ITH" "ELM" "LFT" "GUM" "YAP" "ROR" "SPN" "MFE" "MLU" "CSG" "FCA"
[217] "HLN" "IDA" "JAC" "JNU" "BTM" "PIE" "TVL" "PHF" "BET" "OME" "OTZ" "SCC"
[229] "KTN" "CDV" "YAK" "SIT" "PSG" "WRG" "GUC" "HDN" "PIR"
COMM.RANK = 0
[1] "encoding training data..."
COMM.RANK = 0
[1] "Encoding time: 1.04 secs"
COMM.RANK = 0
[1] "constructing model matrix..."
COMM.RANK = 0
[1] "Encoded data needs about 0.100643 GB"
COMM.RANK = 0
[1] "Constructing time: 7.52 secs"
Loading required package: HsTrust
Loading required package: Rcpp
iter  1 act 6.290e+05 pre 5.622e+05 delta 2.101e+00 f 8.930e+05 |g| 6.073e+05 CG   1
iter  2 act 6.752e+04 pre 5.666e+04 delta 2.101e+00 f 2.640e+05 |g| 1.312e+05 CG   1
iter  3 act 1.004e+04 pre 8.876e+03 delta 2.101e+00 f 1.965e+05 |g| 3.630e+04 CG   1
cg reaches trust region boundary
iter  4 act 3.455e+03 pre 3.842e+03 delta 2.101e+00 f 1.865e+05 |g| 7.325e+03 CG   2
cg reaches trust region boundary
iter  5 act 1.077e+03 pre 1.035e+03 delta 2.240e+00 f 1.830e+05 |g| 3.475e+03 CG   4
cg reaches trust region boundary
iter  6 act 3.777e+02 pre 3.796e+02 delta 2.454e+00 f 1.820e+05 |g| 9.663e+02 CG   4
[1] "Training time: 47.22 secs"
[1] "Total time: 61.88"

全部只要一分鐘,就可以算完約120947440筆資料喔。用完之後記得關閉機器,畢竟AWS的「便宜」是建立在「使用的時候才算錢」,一直開著其實很貴的。

後續筆者有重複跑實驗,但是出來的速度就沒有這麼穩定。這其實會受到當下AWS環境的影響,但是大致上都是數分鐘內可以跑出結果。

結語

說實話,這裡Demo的數字並沒有很快。只要電腦夠好,在一台記憶體充足的電腦上,使用正確的工具和演算法,應該有機會可以更快地完成運算。但是這整套基於雲端運算的MPI有以下的特性:

處理不是非常非常大的數據,並不是只有Hadoop,MPI + R + AWS也是一種選項。

作者

Wush Wu (wush978@gmail.com)

附錄

安裝需要的工具

筆者很喜歡ubuntu的套件庫,因為它們已經納入許多R相關的套件:

  1. 在shell底下安裝R和openmpi

    sudo apt-get update
    sudo apt-get install r-base openmpi-bin samba cifs-utils pip
    sudo apt-get build-dep r-cran-rmpi
    
  2. 在R 底下安裝pbdMPI

    install.packages('pbdMPI')
    

ps. cifs-utils套件是因為筆者使用samba服務讓slave掛載master的磁碟,如果讀者要用其他方式,則請在slave上安裝對應的套件。

Samba(選擇性)

筆者是利用samba來讓所有機器掛載同樣的遠端磁碟,以進行SPMD架構的分析。熟悉NFS或其他遠端硬碟設定方式的讀者,可以按照自己習慣的方式做設定。

  1. 先建立Repository目錄:

    mkdir -p ~/Repository
    
  2. 分享資料夾 在/etc/samba/smb.conf最底下加入:

[Repository]
   path = /home/ubuntu/Repository
   browseable = no
   read only = yes
   guest ok = yes
  1. 重新啟動samba
sudo testparm
sudo service smbd restart

如此一來, slaves就可以掛載master上的資料夾了。

Rstudio Server(選擇性)

如果讀者想要直接在雲端上編輯程式碼,那筆者推薦使用rstudio server版本。安裝容易,而且提供用瀏覽器來編輯程式碼的功能。

在shell上安裝Rstudio server:

sudo apt-get install gdebi-core libapparmor1
wget http://download2.rstudio.org/rstudio-server-0.97.551-amd64.deb
sudo gdebi rstudio-server-0.97.551-amd64.deb

安裝完畢後,先設定使用者密碼:

sudo passwd ubuntu

接著只要防火牆有開port 8787,讀者可以打開http://public.domain.name.of.aws.instance:8787,輸入使用者(ubuntu)和剛剛設定的密碼後,應該會看到Rstudio的GUI介面。

細節請參考Rstudio 官方文件

之後,我們只要編輯master上的文件,透過samba則所有的Slave都可以即時運用編輯後的結果。

OpenSSH 設定

openmpi在跨機器時會使用openssh,所以這裡要針對openssh做一點設定:

  1. 不強制認證Host

編輯/etc/ssh/ssh_config,新增:

StrictHostKeyChecking no
  1. 在AWS建立key pair,取名為pbdMPI。此時讀者的電腦應會下載一個檔案:pbdMPI.pem
  2. pbdMPI.pem上傳到虛擬機器中,並且重新命名並放置於$HOME/.ssh/id_rsa

複製機器

請讀者利用剛剛製作的AMI:(MPI-slave),建立新的虛擬機器來當作slave吧!在設定時只需要注意:

  1. Access key-pair要選擇剛剛建立的pbdMPI
  2. 防火牆設定要把所有TCP/IP port打開。

設定IP

依序將master和slaves的private ip儲存成master下的~/pbdMPI.conf

xx.xx.xx.xx
xx.xx.xx.xx
xx.xx.xx.xx

如果instance少,讀者可以從AWS management console中一個一個查詢。如果instance多,可以使用AWS CLI來查ip,或是使用以下的python script來直接列出ip:

#!/usr/bin/python2.7
# -*- coding: utf-8 -*-

__author__ = "Wush Wu"
__copyright__ = "© 2013, Wush Wu"
__license__ = "GPL 3.0"

import boto.ec2
import os

conn = boto.ec2.connect_to_region("put the region here",
aws_access_key_id='put your aws access key id here',
aws_secret_access_key='put your aws secret access key here')

ec2_prototype_ip = ""
ec2_master_ip = ""

for res in conn.get_all_instances():
  for instance in res.instances:
    if instance.key_name != "put your key name here":
      continue
    if instance.state != "running":
      continue
    print instance.private_ip_address

不熟悉命令列編輯器的讀者,可以用Rstudio編輯器來編輯~/pbdMPI.conf喔。

讓Slave掛載master的磁碟

這裡筆者也提供一個R script來讓master透過~/pbdMPI.conf的內容來透過ssh讓slave 掛載master上分享的Samba磁碟。同時也可以用來測試ssh的設定是否正確。

#! /usr/bin/Rscript

# __author__ = "Wush Wu"
# __copyright__ = "© 2013, Wush Wu"
# __license__ = "GPL 3.0"

src <- readLines("~/pbdMPI.conf")
master.ip <- src[1]
slaves.ip <- tail(src, -1)

for(slave.ip in slaves.ip) {
  system(sprintf('ssh %s "sudo mount //%s/Repository ~/Repository -o guest" &', slave.ip, master.ip))
}

將上述程式碼存成mount.R後在shell執行:

Rscript mount.R

Hello pbdMPI

最後終於可以來測試pbdMPI的環境是否設定成功了!

依照以下內容建立~/Repository/hello.R

#! /usr/bin/Rscript

suppressPackageStartupMessages(library(pbdMPI))
init()
print(sprintf("Hello pbdMPI from node %d", comm.rank()))
finalize()

最後執行shell script:(以3台機器為例)

mpirun -np 3 --hostfile ~/pbdMPI.conf Rscript ~/Repository/hello.R

有沒有看到以下的訊息呢?

[1][1]
 "Hello pbdMPI from node 2"
 "Hello pbdMPI from node 1"
[1] "Hello pbdMPI from node 0"

順序可能會隨機改變,不過只要看到類似的內容,就代表我們成功的設定好pbdMPI的環境了!