使用Quartet 資料集

data(Quartet)
str(Quartet)

一項式回歸分析

plot(Quartet$x, Quartet$y1)
lmfit = lm(Quartet$y1~Quartet$x)
abline(lmfit, col="red") 

一項式回歸分析 (lsfit)

plot(Quartet$x, Quartet$y1)
lmfit2 = lsfit(Quartet$x,Quartet$y1)
abline(lmfit2, col="red") 

二項式回歸分析

plot(Quartet$x, Quartet$y2)
lmfit = lm(Quartet$y2~poly(Quartet$x,2))
lines(sort(Quartet$x), lmfit$fit[order(Quartet$x)], col = "red")

可容錯的回歸

plot(Quartet$x, Quartet$y3)
lmfit = rlm(Quartet$y3~Quartet$x)
abline(lmfit, col="red")

讀取乳癌資料集

require(mlbench)
## Loading required package: mlbench
data(BreastCancer)
BreastCancer <- na.omit(BreastCancer) 
BreastCancer$Id <- NULL 
set.seed(2)
ind <- sample(2, nrow(BreastCancer), replace = TRUE, prob=c(0.8, 0.2))

建立決策樹

require(rpart)
## Loading required package: rpart
x.rp <- rpart(Class ~ ., data=BreastCancer[ind == 1,])
x.rp.pred <- predict(x.rp, type="class", newdata=BreastCancer[ind == 2,])
x.rp.prob <- predict(x.rp, type="prob", newdata=BreastCancer[ind == 2,])

繪製決策樹

plot(x.rp, main="Decision tree created using rpart")

plot of chunk unnamed-chunk-3

可以改用SVM 產生模型

library(e1071)
x.svm <- svm(Class~., data = BreastCancer[ind == 1,])

產生ROC 圖

require(ROCR)
## Loading required package: ROCR
## Loading required package: gplots
## KernSmooth 2.23 loaded
## Copyright M. P. Wand 1997-2009
## 
## Attaching package: 'gplots'
## 
## The following object is masked from 'package:stats':
## 
##     lowess
x.rp.prob.rocr <- prediction(x.rp.prob[,2], BreastCancer[ind == 2,'Class'])
x.rp.perf <- performance(x.rp.prob.rocr, "tpr","fpr")

產生ROC 圖

plot(x.rp.perf, col=2, main="ROC curves comparing classification performance of five machine learning models")

plot of chunk unnamed-chunk-6

組成分析

dataset <- read.csv('eco_index.csv',head=TRUE, sep=',', row.names=1)
pc.cr <- princomp(dataset, cor = TRUE) 
plot(pc.cr)

繪製線形圖

screeplot(pc.cr,  type="lines")
abline(h=1, lty=3)

PCA 雙邊圖

biplot(pc.cr)

繪製直方圖

barplot(sort(-pc.cr$scores[,1], TRUE))

階層式分層

mydata <- read.csv('costumer_segment.txt',head=TRUE, sep='\t')
mydata <- scale(mydata) 
d <- dist(mydata, method = "euclidean")
fit <- hclust(d, method="ward")

繪製階層式分層

plot(fit)

決定切入點

k1 = 4
groups <- cutree(fit, k=k1)
rect.hclust(fit, k=k1, border="red") 

KMEANS 分群

fit <- kmeans(mydata, k1)
plot(mydata, col = fit$cluster) 

繪製主成分分群圖

library(cluster)
clusplot(mydata, fit$cluster, color=TRUE, shade=TRUE, lines=0) 

Hadoop 基本指令

瀏覽HDFS檔案
hadoop fs –ls

將檔案放到hdfs 上
hadoop fs -put test.txt ./

將檔案下載到本地端
hadoop fs -get test.txt ./test2.txt

瀏覽檔案內容
hadoop fs -cat test.txt

建立目錄
hadoop fs -mkdir test

刪除資料
hadoop fs –rm test.txt 

安裝RHadoop

安裝rmr2 相關套件

$sudo R
> install.packages(c("codetools", "R", "Rcpp", "RJSONIO", "bitops", "digest", "functional", "stringr", "plyr", "reshape2", "rJava", "caTools"))

安裝 rmr2

$ wget --no-check-certificate https://raw.githubusercontent.com/RevolutionAnalytics/rmr2/master/build/rmr2_3.1.2.tar.gz

$ sudo R CMD INSTALL rmr2_3.1.2.tar.gz

安裝 RHDFS

$ wget --no-check-certificate https://raw.github.com/RevolutionAnalytics/rhdfs/master/build/rhdfs_1.0.8.tar.gz

$ sudo HADOOP_CMD=/usr/bin/hadoop  R CMD INSTALL rhdfs_1.0.8.tar.gz

Rhadoop 設定

設定 HADOOP_CMD
which hadoop

設定 HADOOP_STREAMING
locate streaming | grep jar | more #尋找jar 檔

啟動 hdfs

> Sys.setenv(HADOOP_CMD="/usr/bin/hadoop")
> Sys.setenv(HADOOP_STREAMING="/opt/cloudera/parcels/CDH-5.0.0-1.cdh5.0.0.p0.47/lib/hadoop-mapreduce/hadoop-streaming.jar")
> library(rhdfs)
> hdfs.init()

javareconf

$ echo $JAVA_HOME
$ sudo JAVA_HOME=/usr/java/jdk1.7.0_45-cloudera R CMD javareconf

使用rhdfs操作 HDFS

啟用rhdfs
hdfs.init ()

將資料從本地端放置HDFS.
hdfs.put('test.txt', './')

觀看當前目錄
hdfs.ls('./')

拷貝檔案
hdfs.copy(‘test.txt’, ‘test2.txt’)

新建資料夾test
hdfs.mkdir(‘test’)

將資料下載到本地端
hdfs.get('test.txt', '/home/cloudera/test3.txt')

將資料搬移到不同位置
hdfs.move('test.txt', ‘./test/q1.txt')

重新命名
hdfs.rename(‘./test/q1.txt',‘./test/test.txt')

變更目錄權限
hdfs.chmod('/RHadoop', permissions= '777')

刪除資料
hdfs.delete(‘./test/‘)
hdfs.rm(‘./test/‘)

觀看檔案資訊
hdfs.file.info(‘./’)

rhdfs 寫入檔案

f = hdfs.file("test.txt","w")
data(iris)
hdfs.write(iris,f)
hdfs.close(f)

rhdfs 讀取檔案

f = hdfs.file("test.txt", "r")
dfserialized <- hdfs.read(f)
df <- unserialize(dfserialized)
df
hdfs.close(f)

使用rmr操作HDFS

將資料寫進hdfs
small.ints = to.dfs(1:10)

從hdfs 讀回資料
from.dfs(/’tmp/file4eacda5ffa2’)

使用RHadoop撰寫 MapReduce

將檔案移入HDFS

Sys.setenv(HADOOP_CMD="/usr/bin/hadoop")
Sys.setenv(HADOOP_STREAMING="/opt/cloudera/parcels/CDH-5.0.0-1.cdh5.0.0.p0.47/lib/hadoop-mapreduce/hadoop-streaming.jar")
library(rmr2) 
library(rhdfs) 
hdfs.init() 
hdfs.mkdir(“/user/cloudera/wordcount/data”)h
fs.put("wc_input.txt", "/user/cloudera/wordcount/data") 
$ hadoop fs –mkdir /user/cloudera/wordcount/data
$ hadoop fs –put wc_input.txt /user/cloudera/word/count/data

Wordcount Mapper

map <- function(k,lines) {
  words.list <- strsplit(lines, '\\s')
  words <- unlist(words.list)
  return( keyval(words, 1) ) 
} 
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { 
    private final static IntWritable one = new IntWritable(1); 
    private Text word = new Text(); 
    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 
    String line = value.toString(); 
    StringTokenizer tokenizer = new StringTokenizer(line); 
  while (tokenizer.hasMoreTokens()) {   
        word.set(tokenizer.nextToken()); 
        output.collect(word, one); 
    } 
    } 
}

Wordcount Reducer

reduce <- function(word, counts) { 
  keyval(word, sum(counts)) 
} 
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { 
    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 
        int sum = 0; 
        while (values.hasNext()) { 
             sum += values.next().get(); 
      } 
    output.collect(key, new IntWritable(sum)); 
    } 
} 

呼叫 Wordcount

hdfs.root <- 'wordcount' 
hdfs.data <- file.path(hdfs.root, 'data') 
hdfs.out <- file.path(hdfs.root, 'out') 
wordcount <- function (input, output=NULL) { 
  mapreduce(input=input, output=output, input.format="text", map=map, reduce=reduce) 
} 
out <- wordcount(hdfs.data, hdfs.out)

從HDFS讀取資料

results <- from.dfs(out) 
results$key[order(results$val, decreasing = TRUE)][1:10]
$ hadoop fs –cat /user/cloudera/wordcount/out/part-00000 | sort –k 2 –nr | head –n 10

MapReduce 效能評比

> a.time <- proc.time() 
> small.ints2=1:100000 
> result.normal = sapply(small.ints2, function(x) x^2) 
> proc.time() - a.time 
> b.time <- proc.time() 
> small.ints= to.dfs(1:100000) 
> result = mapreduce(input = small.ints, map = function(k,v)        cbind(v,v^2)) 
> proc.time() - b.time 

實戰rmr2

如何debug 中間的變數

rmr.options(backend = 'local')
out = mapreduce(to.dfs(1), map = function(k, v) rmr.str(v))

第一支R MapReduce 程式

# 替所有的值開平方
rmr.options(backend = 'local')
small.ints = to.dfs(1:100)
mapr = mapreduce(input = small.ints, 
                 map = function(k,v) cbind(v,v^2)) 
result = from.dfs(mapr)
result

分組計算資料

data(mtcars)
tapply(mtcars$mpg, mtcars$gear, sum)

改成mapreduce 版本前

rmr.options(backend = 'local')
out = from.dfs(to.dfs(mtcars))
out

新增個mapper

sumup = function(input,output = NULL){
  ## sumup-map
  wc.map = function(., row) {
    k = row$mpg 
    keyval(row$gear, k)}
  mapreduce(
    input = input,
    output = output,
    map = wc.map
    )}

新增個mapper

##  sumup
sumup = function(input,output = NULL){
  ## sumup-map
  wc.map = function(., row) {
    k = row$mpg 
    rmr.str(row)
    keyval(row$gear, k)}
  mapreduce(
    input = input,
    output = output,
    map = wc.map
    )}

新增個mapper

##  sumup
wordcount = function(input,output = NULL){
    ## sumup-map
    wc.map = function(., row) {
        k = row$mpg 
        keyval(row$gear, k)}
    ##  sumup-reduce
    wc.reduce =
      function(word, val ) {
        keyval(word, sum(val))}
    ##  sumup-mapreduce
    mapreduce(
      input = input,
      output = output,
      map = wc.map,
      reduce = wc.reduce
)}

使用rmr.str 觀察變數

##  sumup
wordcount = function(input,output = NULL){
    ## sumup-map
    wc.map = function(., row) {
        k = row$mpg 
        keyval(row$gear, k)}
    ##  sumup-reduce
    wc.reduce =
      function(word, val ) {
        rmr.str(val) 
        keyval(word, sum(val))}
    ##  sumup-mapreduce
    mapreduce(
      input = input,
      output = output,
      map = wc.map,
      reduce = wc.reduce
)}

呼叫mapreduce 程式

rmr.options(backend = 'local')
out = from.dfs(wordcount(to.dfs(keyval(NULL, mtcars))))
out

讀入資料

solutions = read.csv(file="solutions.csv", header=TRUE)
reviews = read.csv(file="reviews.csv", header=TRUE)
str(solutions)
str(reviews)

合併資料

merge(df1,df2, by.x="id", by.y="id", all=TRUE)

小量試產

rmr.options(backend = 'local')
rv = to.dfs(keyval(NULL, cbind(reviews[1:3,], "rv")))
sl = to.dfs(keyval(NULL, cbind(solutions[1:3,], "sl")))
out = from.dfs(tablejoin(c(rv,sl)))
out

Join 的map reduce 思維

##  tablejoin-map 
wc.map = function(., row) {
    keyval(row[1], row)
  }
  ##  tablejoin-reduce
  wc.reduce =
    function(word, val ) {
      keyval(word,data.frame(left = val[1,], right = val[2,]))
      }

equijoin

from.dfs(equijoin(left.input = to.dfs(keyval(1:10, 1:10^2)), right.input = to.dfs(keyval(1:10, 1:10^3))))