R 語言基礎

讀取資料

test.data = read.table(header = TRUE, text = "
 a b
 1 2
 3 4
 ")
class(test.data)
## [1] "data.frame"

寫入資料

write.table(test.data, file = "test.txt" , sep = " ")
write.csv(test.data, file = "test.csv")

操作資料

data(iris)
Sepal.iris = iris[c("Sepal.Length", "Sepal.Width")]
str(Sepal.iris)
## 'data.frame':    150 obs. of  2 variables:
##  $ Sepal.Length: num  5.1 4.9 4.7 4.6 5 5.4 4.6 5 4.4 4.9 ...
##  $ Sepal.Width : num  3.5 3 3.2 3.1 3.6 3.9 3.4 3.4 2.9 3.1 ...

資料篩選

Five.Sepal.iris = iris[1:5, c("Sepal.Length", "Sepal.Width")]
Five.Sepal.iris
##   Sepal.Length Sepal.Width
## 1          5.1         3.5
## 2          4.9         3.0
## 3          4.7         3.2
## 4          4.6         3.1
## 5          5.0         3.6
setosa.data = iris[iris$Species=="setosa",1:5]
head(setosa.data)
##   Sepal.Length Sepal.Width Petal.Length Petal.Width Species
## 1          5.1         3.5          1.4         0.2  setosa
## 2          4.9         3.0          1.4         0.2  setosa
## 3          4.7         3.2          1.3         0.2  setosa
## 4          4.6         3.1          1.5         0.2  setosa
## 5          5.0         3.6          1.4         0.2  setosa
## 6          5.4         3.9          1.7         0.4  setosa
which(iris$Species=="setosa")
##  [1]  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
## [24] 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
## [47] 47 48 49 50

資料篩選

Sepal.data = subset(iris, select=c("Sepal.Length", "Sepal.Width"))
head(Sepal.data)
##   Sepal.Length Sepal.Width
## 1          5.1         3.5
## 2          4.9         3.0
## 3          4.7         3.2
## 4          4.6         3.1
## 5          5.0         3.6
## 6          5.4         3.9
setosa.data = subset(iris, Species =="setosa")
head(setosa.data)
##   Sepal.Length Sepal.Width Petal.Length Petal.Width Species
## 1          5.1         3.5          1.4         0.2  setosa
## 2          4.9         3.0          1.4         0.2  setosa
## 3          4.7         3.2          1.3         0.2  setosa
## 4          4.6         3.1          1.5         0.2  setosa
## 5          5.0         3.6          1.4         0.2  setosa
## 6          5.4         3.9          1.7         0.4  setosa
example.data= subset(iris, Petal.Length <=1.4 & Petal.Width >= 0.2, select=Species )
head(example.data)
##   Species
## 1  setosa
## 2  setosa
## 3  setosa
## 5  setosa
## 7  setosa
## 9  setosa

資料合併

flower.type = data.frame(Species = "setosa", Flower = "iris")
merge(flower.type, iris[1:3,], by ="Species")
##   Species Flower Sepal.Length Sepal.Width Petal.Length Petal.Width
## 1  setosa   iris          5.1         3.5          1.4         0.2
## 2  setosa   iris          4.9         3.0          1.4         0.2
## 3  setosa   iris          4.7         3.2          1.3         0.2

資料排序

sorted_data = iris[order(iris$Sepal.Length, decreasing = TRUE),]
head(sorted_data)
##     Sepal.Length Sepal.Width Petal.Length Petal.Width   Species
## 132          7.9         3.8          6.4         2.0 virginica
## 118          7.7         3.8          6.7         2.2 virginica
## 119          7.7         2.6          6.9         2.3 virginica
## 123          7.7         2.8          6.7         2.0 virginica
## 136          7.7         3.0          6.1         2.3 virginica
## 106          7.6         3.0          6.6         2.1 virginica

基本統計功能

x = c(1,2,3,4,5,6,7,8,9,10)
mean(x) 
## [1] 5.5
min(x) 
## [1] 1
median(x) 
## [1] 5.5
max(x) 
## [1] 10
var(x)
## [1] 9.166667
summary(x)
##    Min. 1st Qu.  Median    Mean 3rd Qu.    Max. 
##    1.00    3.25    5.50    5.50    7.75   10.00

sapply

sapply(iris[1:4], mean, na.rm=TRUE)
## Sepal.Length  Sepal.Width Petal.Length  Petal.Width 
##     5.843333     3.057333     3.758000     1.199333

plyr

install.packages("plyr") 
library(plyr)
head(iris)
ddply(iris, c("Species"), function(df) mean(df$Sepal.Length))

reshape

library(reshape)
iris.melt <- melt(iris,id='Species')
cast(Species~variable,data=iris.melt,mean, 
     subset=Species %in% c("setosa","versicolor"),
     margins="grand_row") 
##      Species Sepal.Length Sepal.Width Petal.Length Petal.Width
## 1     setosa        5.006       3.428        1.462       0.246
## 2 versicolor        5.936       2.770        4.260       1.326
## 3      (all)        5.471       3.099        2.861       0.786

Aggregate

aggregate(x=iris[,1:4],by=list(iris$Species),FUN=mean)
##      Group.1 Sepal.Length Sepal.Width Petal.Length Petal.Width
## 1     setosa        5.006       3.428        1.462       0.246
## 2 versicolor        5.936       2.770        4.260       1.326
## 3  virginica        6.588       2.974        5.552       2.026

Reshape2

require(reshape2)
## Loading required package: reshape2
## Warning: package 'reshape2' was built under R version 3.1.2
## 
## Attaching package: 'reshape2'
## 
## The following objects are masked from 'package:reshape':
## 
##     colsplit, melt, recast
data(smiths)
head(smiths)
##      subject time age weight height
## 1 John Smith    1  33     90   1.87
## 2 Mary Smith    1  NA     NA   1.54
melt(smiths)
## Using subject as id variables
##      subject variable value
## 1 John Smith     time  1.00
## 2 Mary Smith     time  1.00
## 3 John Smith      age 33.00
## 4 Mary Smith      age    NA
## 5 John Smith   weight 90.00
## 6 Mary Smith   weight    NA
## 7 John Smith   height  1.87
## 8 Mary Smith   height  1.54
names(airquality) <- tolower(names(airquality)) 
aqm <- melt(airquality, id=c("month", "day"), na.rm=TRUE)
dcast(aqm, month ~ variable, mean)
##   month    ozone  solar.r      wind     temp
## 1     5 23.61538 181.2963 11.622581 65.54839
## 2     6 29.44444 190.1667 10.266667 79.10000
## 3     7 59.11538 216.4839  8.941935 83.90323
## 4     8 59.96154 171.8571  8.793548 83.96774
## 5     9 31.44828 167.4333 10.180000 76.90000

Pie Chart

table.iris = table(iris$Species)
pie(table.iris)

Histogram

hist(iris$Sepal.Length)

Box Plot

boxplot(Petal.Width ~ Species, data = iris)

Scatter Plot

plot(x=iris$Petal.Length, y=iris$Petal.Width, col=iris$Species)

Classfication Example

#install.packages("e1071")
library(e1071)
## Warning: package 'e1071' was built under R version 3.1.2
pairs(iris[1:4],main="Iris Data (red=setosa,green=versicolor,blue=virginica)", pch=21, bg=c("red","green3","blue")[unclass(iris$Species)])

classifier<-naiveBayes(iris[,1:4], iris[,5])
table(predict(classifier, iris[,-5]), iris[,5])
##             
##              setosa versicolor virginica
##   setosa         50          0         0
##   versicolor      0         47         3
##   virginica       0          3        47
classifier<-svm(iris[,1:4], iris[,5])
table(predict(classifier, iris[,-5]), iris[,5])
##             
##              setosa versicolor virginica
##   setosa         50          0         0
##   versicolor      0         48         2
##   virginica       0          2        48
prediction = predict(classifier, iris[,1:4])

探索鐵達尼號

library(dplyr)
data(Titanic)
str(Titanic)
titanic = data.frame(Titanic)

#過濾資料
titanic[titanic$Sex=="Male" & titanic$Age=="Adult", ]
filter(titanic, Sex == "Male", Age== "Adult")

#可以使用 AND, OR 與 IN 來過濾資料 
filter(titanic, Sex == "Male" | Class== "Crew")
filter(titanic, Sex == "Male" & Class== "Crew")
filter(titanic, Class %in% c('1st', 'Crew'))

#選擇欄位
titanic[, c("Sex","Age")]
select(titanic, Sex, Age)

#選擇性別到生存的欄位
select(titanic, Sex:Survived)

#選擇包含S 的欄位
select(titanic, contains("S"))


#鏈接(Chaining)
1:10 + 1:10 %>% sum() %>% sqrt()

#使用巢狀結構混和過濾與選擇欄位的用法
filter(select(titanic, Sex, Class, Age), Age == "Child")

#使用Then (%>%)
titanic %>%
    select(Sex, Class, Age) %>%
    filter(Age == "Child")

#使用Arrange 可以將資料做排序
titanic %>%
    select(Sex, Class, Freq, Age) %>%
    filter(Age=="Child") %>%
    arrange(Freq)

#由大到小排序 (desc)
titanic %>%
    select(Sex, Class, Freq, Age) %>%
    filter(Age=="Child") %>%
    arrange(desc(Freq))

#計算總和
 freqsum = titanic %>%
     select(Freq) %>%
     sum()

#使用mutate 新增欄位
titanic %>%
    select(Sex,Age,Freq) %>%
    mutate(portion= Freq/freqsum)

#儲存新欄位
titanic = titanic %>% mutate(portion= Freq/freqsum)

#統計各性別的人次總和
 titanic %>%
     group_by(Sex) %>%
     summarise(Sexsum = sum(Freq, na.rm=TRUE))

#統計多個欄位
titanic %>%
     group_by(Sex) %>%
     summarise_each(funs(sum), Freq, portion)

#針對多個欄位做統計

 titanic %>%
     group_by(Class) %>%
     summarise_each(funs(min(., na.rm=TRUE), max(., na.rm=TRUE)), matches("Freq"))

#一般計數
 titanic %>%
     select(Sex) %>%
     summarise_each(funs(n()))

#不重複計數
 titanic %>%
     select(Sex) %>%
     summarise_each(funs(n_distinct(Sex)))

#使用arrange 排序
titanic %>%
    group_by(Age, Sex) %>%
    summarise(frequency_sum = sum(Freq)) %>%
    arrange(desc(frequency_sum))

#使用tally取總和並排序
 titanic %>%
     group_by(Age, Sex) %>%
     tally(sort = TRUE)

#或使用table 函式
 titanic %>%
     group_by(Class) %>%
     select(Sex, Class) %>%
     table() %>%
     head()

#使用min_rank取分組排名前兩名
titanic %>%
    group_by(Class) %>%
    select(Sex,Age,Freq) %>%
    filter(min_rank(desc(Freq)) <= 2)

#使用top_n取分組排名前兩名
titanic %>%
    group_by(Class) %>%
    select(Sex,Age,Freq) %>%
    top_n(2)

#取得統計數
sex_stat = titanic %>%
    group_by(Sex) %>%
    summarise(sexsum = sum(Freq))

#繪圖
barplot(sex_stat$sexsum, names.arg=sex_stat$Sex, col=c("darkblue","red"))

#使用圓餅圖顯示男女比例
pie(sex_stat$sexsum, label = sex_stat$Sex)

#使用直方圖男女生存數
survived_stat = titanic %>% + group_by(Survived,Sex) %>% + summarise_each(funs(sum), Freq)
survived_tb = dcast(survived_stat, Survived ~ Sex, value.var="Freq")
m = as.matrix(survived_tb[2:3])
barplot(m, legend=c("Perished" ,"Survived"))

名字統計範例

#讀取華盛頓州新生嬰兒的出生與姓名統計
babyname = read.csv("/tmp/WA.txt", header=FALSE)
head(babyname)
colnames(babyname) = c("state", "sex", "year", "name", "freq")

#統計男/女前十大菜市場名

library(dplyr)
top10_female = babyname %>% filter(year == 2012 & sex == "F") %>% group_by(name) %>% summarise(count = sum(freq)) %>% arrange(desc(count)) %>% head(10)

top10_male = babyname %>% filter(year == 2012 & sex == "M") %>% group_by(name) %>% summarise(count = sum(freq)) %>% arrange(desc(count)) %>% head(10)

#將姓名比例繪製成Pie 圖
pie(top10_male$count, label = top10_male$name)

#使用ggplot2
library(ggplot2)
 qplot(mpg, data=mtcars, geom="density", fill=gear, alpha=I(.5), 
       main="Distribution of Gas Milage", xlab="Miles Per Gallon", 
       ylab="Density")

#重新使用ggplot2繪製pie 圖
 ggplot(top10_male, aes(x="", y=count, fill = name)) +
     geom_bar(width = 1, stat = "identity") + coord_polar(theta = "y")

#依菜市場名取出姓名趨勢
dF = babyname %>% group_by(sex, year, name) %>%summarise(count = sum(freq)) %>%  group_by(sex, year) %>% mutate(prop = round(count * 100/sum(count), 3)) %>% filter(name %in% top10_female$name)

dM = babyname %>% group_by(sex, year, name) %>%summarise(count = sum(freq)) %>%  group_by(sex, year) %>% mutate(prop = round(count * 100/sum(count), 3)) %>% filter(name %in% top10_male$name)

#女性姓名趨勢圖
ggplot(data = dF, aes(x=year, y=count, group=name)) + geom_path(aes(colour=name)) + geom_point(aes(colour=name)) 

#男性姓名趨勢圖
ggplot(data = dM[!is.na(dM$year),], aes(x=year, y=count, group=name)) + geom_path(aes(colour=name)) + geom_point(aes(colour=name)) + scale_x_continuous(breaks=1900:2014)

Linux 基本操作

觀看當前目錄檔案列表
ls

切換目錄
cd <directory>

建立目錄
mkdir  <directory>

瀏覽檔案
cat

瀏覽檔案前幾行與後幾行
head, tail

刪除檔案/目錄
rm <file>
rm –r <directory>

yum (CentOS使用, Ubuntu 可以使用apt-get)
yum install mlocate

更新索引資料庫
updatedb

以root 權限執行指令
sudo <command> 

列出網路介面參數
Ifconfig

從網路下載檔案
Wget

以字串做搜尋
grep

一頁頁讀取檔案
more

找出指令位置
which

Pipe - 接續Linux指令
 以| 做pipe
e.g. ls -ltr | head

排序資料
sort

列出不重複資料
uniq

編輯檔案
vi <filename>

切換模式
按下 i : 進入編輯mode
按下esc : 進入指令mode

離開VI
:x (:wq) 儲存且離開
:q! 強制離開

Hadoop 基本指令

瀏覽HDFS檔案
hadoop fs –ls

將檔案放到hdfs 上
hadoop fs -put test.txt ./

將檔案下載到本地端
hadoop fs -get test.txt ./test2.txt

瀏覽檔案內容
hadoop fs -cat test.txt

建立目錄
hadoop fs -mkdir test

刪除資料
hadoop fs –rm test.txt 

安裝RHadoop

安裝rmr2 相關套件

$sudo R
> install.packages(c("codetools", "R", "Rcpp", "RJSONIO", "bitops", "digest", "functional", "stringr", "plyr", "reshape2", "rJava", "caTools"))

安裝 rmr2

$ wget --no-check-certificate https://raw.githubusercontent.com/RevolutionAnalytics/rmr2/master/build/rmr2_3.1.2.tar.gz

$ sudo R CMD INSTALL rmr2_3.1.2.tar.gz

安裝 RHDFS

$ wget --no-check-certificate https://raw.github.com/RevolutionAnalytics/rhdfs/master/build/rhdfs_1.0.8.tar.gz

$ sudo HADOOP_CMD=/usr/bin/hadoop  R CMD INSTALL rhdfs_1.0.8.tar.gz

Rhadoop 設定

設定 HADOOP_CMD
which hadoop

設定 HADOOP_STREAMING
locate streaming | grep jar | more #尋找jar 檔

啟動 hdfs

> Sys.setenv(HADOOP_CMD="/usr/bin/hadoop")
> Sys.setenv(HADOOP_STREAMING="/opt/cloudera/parcels/CDH-5.0.0-1.cdh5.0.0.p0.47/lib/hadoop-mapreduce/hadoop-streaming.jar")
> library(rhdfs)
> hdfs.init()

javareconf

$ echo $JAVA_HOME
$ sudo JAVA_HOME=/usr/java/jdk1.7.0_45-cloudera R CMD javareconf

使用rhdfs操作 HDFS

啟用rhdfs
hdfs.init ()

將資料從本地端放置HDFS.
hdfs.put('test.txt', './')

觀看當前目錄
hdfs.ls('./')

拷貝檔案
hdfs.copy(‘test.txt’, ‘test2.txt’)

新建資料夾test
hdfs.mkdir(‘test’)

將資料下載到本地端
hdfs.get('test.txt', '/home/cloudera/test3.txt')

將資料搬移到不同位置
hdfs.move('test.txt', ‘./test/q1.txt')

重新命名
hdfs.rename(‘./test/q1.txt',‘./test/test.txt')

變更目錄權限
hdfs.chmod('/RHadoop', permissions= '777')

刪除資料
hdfs.delete(‘./test/‘)
hdfs.rm(‘./test/‘)

觀看檔案資訊
hdfs.file.info(‘./’)

rhdfs 寫入檔案

f = hdfs.file("test.txt","w")
data(iris)
hdfs.write(iris,f)
hdfs.close(f)

rhdfs 讀取檔案

f = hdfs.file("test.txt", "r")
dfserialized <- hdfs.read(f)
df <- unserialize(dfserialized)
df
hdfs.close(f)

使用rmr操作HDFS

將資料寫進hdfs
small.ints = to.dfs(1:10)

從hdfs 讀回資料
from.dfs(/’tmp/file4eacda5ffa2’)

使用RHadoop撰寫 MapReduce

將檔案移入HDFS

Sys.setenv(HADOOP_CMD="/usr/bin/hadoop")
Sys.setenv(HADOOP_STREAMING="/opt/cloudera/parcels/CDH-5.0.0-1.cdh5.0.0.p0.47/lib/hadoop-mapreduce/hadoop-streaming.jar")
library(rmr2) 
library(rhdfs) 
hdfs.init() 
hdfs.mkdir(“/user/cloudera/wordcount/data”)h
fs.put("wc_input.txt", "/user/cloudera/wordcount/data") 
$ hadoop fs –mkdir /user/cloudera/wordcount/data
$ hadoop fs –put wc_input.txt /user/cloudera/word/count/data

Wordcount Mapper

map <- function(k,lines) {
  words.list <- strsplit(lines, '\\s')
  words <- unlist(words.list)
  return( keyval(words, 1) ) 
} 
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { 
    private final static IntWritable one = new IntWritable(1); 
    private Text word = new Text(); 
    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 
    String line = value.toString(); 
    StringTokenizer tokenizer = new StringTokenizer(line); 
  while (tokenizer.hasMoreTokens()) {   
        word.set(tokenizer.nextToken()); 
        output.collect(word, one); 
    } 
    } 
}

Wordcount Reducer

reduce <- function(word, counts) { 
  keyval(word, sum(counts)) 
} 
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { 
    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 
        int sum = 0; 
        while (values.hasNext()) { 
             sum += values.next().get(); 
      } 
    output.collect(key, new IntWritable(sum)); 
    } 
} 

呼叫 Wordcount

hdfs.root <- 'wordcount' 
hdfs.data <- file.path(hdfs.root, 'data') 
hdfs.out <- file.path(hdfs.root, 'out') 
wordcount <- function (input, output=NULL) { 
  mapreduce(input=input, output=output, input.format="text", map=map, reduce=reduce) 
} 
out <- wordcount(hdfs.data, hdfs.out)

從HDFS讀取資料

results <- from.dfs(out) 
results$key[order(results$val, decreasing = TRUE)][1:10]
$ hadoop fs –cat /user/cloudera/wordcount/out/part-00000 | sort –k 2 –nr | head –n 10

MapReduce 效能評比

> a.time <- proc.time() 
> small.ints2=1:100000 
> result.normal = sapply(small.ints2, function(x) x^2) 
> proc.time() - a.time 
> b.time <- proc.time() 
> small.ints= to.dfs(1:100000) 
> result = mapreduce(input = small.ints, map = function(k,v)        cbind(v,v^2)) 
> proc.time() - b.time 

實戰rmr2

如何debug 中間的變數

rmr.options(backend = 'local')
out = mapreduce(to.dfs(1), map = function(k, v) rmr.str(v))

第一支R MapReduce 程式

# 替所有的值開平方
rmr.options(backend = 'local')
small.ints = to.dfs(1:100)
mapr = mapreduce(input = small.ints, 
                 map = function(k,v) cbind(v,v^2)) 
result = from.dfs(mapr)
result

分組計算資料

data(mtcars)
tapply(mtcars$mpg, mtcars$gear, sum)

改成mapreduce 版本前

rmr.options(backend = 'local')
out = from.dfs(to.dfs(mtcars))
out

新增個mapper

sumup = function(input,output = NULL){
  ## sumup-map
  wc.map = function(., row) {
    k = row$mpg 
    keyval(row$gear, k)}
  mapreduce(
    input = input,
    output = output,
    map = wc.map
    )}

新增個mapper

##  sumup
sumup = function(input,output = NULL){
  ## sumup-map
  wc.map = function(., row) {
    k = row$mpg 
    rmr.str(row)
    keyval(row$gear, k)}
  mapreduce(
    input = input,
    output = output,
    map = wc.map
    )}

新增個mapper

##  sumup
wordcount = function(input,output = NULL){
    ## sumup-map
    wc.map = function(., row) {
        k = row$mpg 
        keyval(row$gear, k)}
    ##  sumup-reduce
    wc.reduce =
      function(word, val ) {
        keyval(word, sum(val))}
    ##  sumup-mapreduce
    mapreduce(
      input = input,
      output = output,
      map = wc.map,
      reduce = wc.reduce
)}

使用rmr.str 觀察變數

##  sumup
wordcount = function(input,output = NULL){
    ## sumup-map
    wc.map = function(., row) {
        k = row$mpg 
        keyval(row$gear, k)}
    ##  sumup-reduce
    wc.reduce =
      function(word, val ) {
        rmr.str(val) 
        keyval(word, sum(val))}
    ##  sumup-mapreduce
    mapreduce(
      input = input,
      output = output,
      map = wc.map,
      reduce = wc.reduce
)}

呼叫mapreduce 程式

rmr.options(backend = 'local')
out = from.dfs(wordcount(to.dfs(keyval(NULL, mtcars))))
out

讀入資料

solutions = read.csv(file="solutions.csv", header=TRUE)
reviews = read.csv(file="reviews.csv", header=TRUE)
str(solutions)
str(reviews)

合併資料

merge(df1,df2, by.x="id", by.y="id", all=TRUE)

小量試產

rmr.options(backend = 'local')
rv = to.dfs(keyval(NULL, cbind(reviews[1:3,], "rv")))
sl = to.dfs(keyval(NULL, cbind(solutions[1:3,], "sl")))
out = from.dfs(tablejoin(c(rv,sl)))
out

Join 的map reduce 思維

##  tablejoin-map 
wc.map = function(., row) {
    keyval(row[1], row)
  }
  ##  tablejoin-reduce
  wc.reduce =
    function(word, val ) {
      keyval(word,data.frame(left = val[1,], right = val[2,]))
      }

equijoin

from.dfs(equijoin(left.input = to.dfs(keyval(1:10, 1:10^2)), right.input = to.dfs(keyval(1:10, 1:10^3))))