author: “David Chiu” date: “Friday, August 22, 2014” EMAIL: david@largitdata.com output: html_document
test.data = read.table(header = TRUE, text = "
a b
1 2
3 4
")
class(test.data)
## [1] "data.frame"
write.table(test.data, file = "test.txt" , sep = " ")
write.csv(test.data, file = "test.csv")
data(iris)
Sepal.iris = iris[c("Sepal.Length", "Sepal.Width")]
str(Sepal.iris)
## 'data.frame': 150 obs. of 2 variables:
## $ Sepal.Length: num 5.1 4.9 4.7 4.6 5 5.4 4.6 5 4.4 4.9 ...
## $ Sepal.Width : num 3.5 3 3.2 3.1 3.6 3.9 3.4 3.4 2.9 3.1 ...
Five.Sepal.iris = iris[1:5, c("Sepal.Length", "Sepal.Width")]
Five.Sepal.iris
## Sepal.Length Sepal.Width
## 1 5.1 3.5
## 2 4.9 3.0
## 3 4.7 3.2
## 4 4.6 3.1
## 5 5.0 3.6
setosa.data = iris[iris$Species=="setosa",1:5]
head(setosa.data)
## Sepal.Length Sepal.Width Petal.Length Petal.Width Species
## 1 5.1 3.5 1.4 0.2 setosa
## 2 4.9 3.0 1.4 0.2 setosa
## 3 4.7 3.2 1.3 0.2 setosa
## 4 4.6 3.1 1.5 0.2 setosa
## 5 5.0 3.6 1.4 0.2 setosa
## 6 5.4 3.9 1.7 0.4 setosa
which(iris$Species=="setosa")
## [1] 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
## [24] 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
## [47] 47 48 49 50
Sepal.data = subset(iris, select=c("Sepal.Length", "Sepal.Width"))
head(Sepal.data)
## Sepal.Length Sepal.Width
## 1 5.1 3.5
## 2 4.9 3.0
## 3 4.7 3.2
## 4 4.6 3.1
## 5 5.0 3.6
## 6 5.4 3.9
setosa.data = subset(iris, Species =="setosa")
head(setosa.data)
## Sepal.Length Sepal.Width Petal.Length Petal.Width Species
## 1 5.1 3.5 1.4 0.2 setosa
## 2 4.9 3.0 1.4 0.2 setosa
## 3 4.7 3.2 1.3 0.2 setosa
## 4 4.6 3.1 1.5 0.2 setosa
## 5 5.0 3.6 1.4 0.2 setosa
## 6 5.4 3.9 1.7 0.4 setosa
example.data= subset(iris, Petal.Length <=1.4 & Petal.Width >= 0.2, select=Species )
head(example.data)
## Species
## 1 setosa
## 2 setosa
## 3 setosa
## 5 setosa
## 7 setosa
## 9 setosa
flower.type = data.frame(Species = "setosa", Flower = "iris")
merge(flower.type, iris[1:3,], by ="Species")
## Species Flower Sepal.Length Sepal.Width Petal.Length Petal.Width
## 1 setosa iris 5.1 3.5 1.4 0.2
## 2 setosa iris 4.9 3.0 1.4 0.2
## 3 setosa iris 4.7 3.2 1.3 0.2
sorted_data = iris[order(iris$Sepal.Length, decreasing = TRUE),]
head(sorted_data)
## Sepal.Length Sepal.Width Petal.Length Petal.Width Species
## 132 7.9 3.8 6.4 2.0 virginica
## 118 7.7 3.8 6.7 2.2 virginica
## 119 7.7 2.6 6.9 2.3 virginica
## 123 7.7 2.8 6.7 2.0 virginica
## 136 7.7 3.0 6.1 2.3 virginica
## 106 7.6 3.0 6.6 2.1 virginica
x = c(1,2,3,4,5,6,7,8,9,10)
mean(x)
## [1] 5.5
min(x)
## [1] 1
median(x)
## [1] 5.5
max(x)
## [1] 10
var(x)
## [1] 9.167
summary(x)
## Min. 1st Qu. Median Mean 3rd Qu. Max.
## 1.00 3.25 5.50 5.50 7.75 10.00
sapply(iris[1:4], mean, na.rm=TRUE)
## Sepal.Length Sepal.Width Petal.Length Petal.Width
## 5.843 3.057 3.758 1.199
install.packages("plyr")
library(plyr)
head(iris)
ddply(iris, c("Species"), function(df) mean(df$Sepal.Length))
library(reshape)
iris.melt <- melt(iris,id='Species')
cast(Species~variable,data=iris.melt,mean,
subset=Species %in% c("setosa","versicolor"),
margins="grand_row")
## Species Sepal.Length Sepal.Width Petal.Length Petal.Width
## 1 setosa 5.006 3.428 1.462 0.246
## 2 versicolor 5.936 2.770 4.260 1.326
## 3 (all) 5.471 3.099 2.861 0.786
aggregate(x=iris[,1:4],by=list(iris$Species),FUN=mean)
## Group.1 Sepal.Length Sepal.Width Petal.Length Petal.Width
## 1 setosa 5.006 3.428 1.462 0.246
## 2 versicolor 5.936 2.770 4.260 1.326
## 3 virginica 6.588 2.974 5.552 2.026
require(reshape2)
## Loading required package: reshape2
##
## Attaching package: 'reshape2'
##
## The following objects are masked from 'package:reshape':
##
## colsplit, melt, recast
data(smiths)
head(smiths)
## subject time age weight height
## 1 John Smith 1 33 90 1.87
## 2 Mary Smith 1 NA NA 1.54
melt(smiths)
## Using subject as id variables
## subject variable value
## 1 John Smith time 1.00
## 2 Mary Smith time 1.00
## 3 John Smith age 33.00
## 4 Mary Smith age NA
## 5 John Smith weight 90.00
## 6 Mary Smith weight NA
## 7 John Smith height 1.87
## 8 Mary Smith height 1.54
names(airquality) <- tolower(names(airquality))
aqm <- melt(airquality, id=c("month", "day"), na.rm=TRUE)
dcast(aqm, month ~ variable, mean)
## month ozone solar.r wind temp
## 1 5 23.62 181.3 11.623 65.55
## 2 6 29.44 190.2 10.267 79.10
## 3 7 59.12 216.5 8.942 83.90
## 4 8 59.96 171.9 8.794 83.97
## 5 9 31.45 167.4 10.180 76.90
table.iris = table(iris$Species)
pie(table.iris)
hist(iris$Sepal.Length)
boxplot(Petal.Width ~ Species, data = iris)
plot(x=iris$Petal.Length, y=iris$Petal.Width, col=iris$Species)
#install.packages("e1071")
library(e1071)
pairs(iris[1:4],main="Iris Data (red=setosa,green=versicolor,blue=virginica)", pch=21, bg=c("red","green3","blue")[unclass(iris$Species)])
classifier<-naiveBayes(iris[,1:4], iris[,5])
table(predict(classifier, iris[,-5]), iris[,5])
##
## setosa versicolor virginica
## setosa 50 0 0
## versicolor 0 47 3
## virginica 0 3 47
classifier<-svm(iris[,1:4], iris[,5])
table(predict(classifier, iris[,-5]), iris[,5])
##
## setosa versicolor virginica
## setosa 50 0 0
## versicolor 0 48 2
## virginica 0 2 48
prediction = predict(classifier, iris[,1:4])
觀看當前目錄檔案列表
ls
切換目錄
cd <directory>
建立目錄
mkdir <directory>
瀏覽檔案
cat
瀏覽檔案前幾行與後幾行
head, tail
刪除檔案/目錄
rm <file>
rm –r <directory>
yum (CentOS使用, Ubuntu 可以使用apt-get)
yum install mlocate
更新索引資料庫
updatedb
以root 權限執行指令
sudo <command>
列出網路介面參數
Ifconfig
從網路下載檔案
Wget
以字串做搜尋
grep
一頁頁讀取檔案
more
找出指令位置
which
Pipe - 接續Linux指令
以| 做pipe
e.g. ls -ltr | head
排序資料
sort
列出不重複資料
uniq
編輯檔案
vi <filename>
切換模式
按下 i : 進入編輯mode
按下esc : 進入指令mode
離開VI
:x (:wq) 儲存且離開
:q! 強制離開
瀏覽HDFS檔案
hadoop fs –ls
將檔案放到hdfs 上
hadoop fs -put test.txt ./
將檔案下載到本地端
hadoop fs -get test.txt ./test2.txt
瀏覽檔案內容
hadoop fs -cat test.txt
建立目錄
hadoop fs -mkdir test
刪除資料
hadoop fs –rm test.txt
$sudo R
> install.packages(c("codetools", "R", "Rcpp", "RJSONIO", "bitops", "digest", "functional", "stringr", "plyr", "reshape2", "rJava", "caTools"))
$ wget --no-check-certificate https://raw.githubusercontent.com/RevolutionAnalytics/rmr2/master/build/rmr2_3.1.2.tar.gz
$ sudo R CMD INSTALL rmr2_3.1.2.tar.gz
$ wget --no-check-certificate https://raw.github.com/RevolutionAnalytics/rhdfs/master/build/rhdfs_1.0.8.tar.gz
$ sudo HADOOP_CMD=/usr/bin/hadoop R CMD INSTALL rhdfs_1.0.8.tar.gz
設定 HADOOP_CMD
which hadoop
設定 HADOOP_STREAMING
locate streaming | grep jar | more #尋找jar 檔
> Sys.setenv(HADOOP_CMD="/usr/bin/hadoop")
> Sys.setenv(HADOOP_STREAMING="/opt/cloudera/parcels/CDH-5.0.0-1.cdh5.0.0.p0.47/lib/hadoop-mapreduce/hadoop-streaming.jar")
> library(rhdfs)
> hdfs.init()
$ echo $JAVA_HOME
$ sudo JAVA_HOME=/usr/java/jdk1.7.0_45-cloudera R CMD javareconf
啟用rhdfs
hdfs.init ()
將資料從本地端放置HDFS.
hdfs.put('test.txt', './')
觀看當前目錄
hdfs.ls('./')
拷貝檔案
hdfs.copy(‘test.txt’, ‘test2.txt’)
新建資料夾test
hdfs.mkdir(‘test’)
將資料下載到本地端
hdfs.get('test.txt', '/home/cloudera/test3.txt')
將資料搬移到不同位置
hdfs.move('test.txt', ‘./test/q1.txt')
重新命名
hdfs.rename(‘./test/q1.txt',‘./test/test.txt')
變更目錄權限
hdfs.chmod('/RHadoop', permissions= '777')
刪除資料
hdfs.delete(‘./test/‘)
hdfs.rm(‘./test/‘)
觀看檔案資訊
hdfs.file.info(‘./’)
f = hdfs.file("test.txt","w")
data(iris)
hdfs.write(iris,f)
hdfs.close(f)
f = hdfs.file("test.txt", "r")
dfserialized <- hdfs.read(f)
df <- unserialize(dfserialized)
df
hdfs.close(f)
將資料寫進hdfs
small.ints = to.dfs(1:10)
從hdfs 讀回資料
from.dfs(/’tmp/file4eacda5ffa2’)
Sys.setenv(HADOOP_CMD="/usr/bin/hadoop")
Sys.setenv(HADOOP_STREAMING="/opt/cloudera/parcels/CDH-5.0.0-1.cdh5.0.0.p0.47/lib/hadoop-mapreduce/hadoop-streaming.jar")
library(rmr2)
library(rhdfs)
hdfs.init()
hdfs.mkdir(“/user/cloudera/wordcount/data”)h
fs.put("wc_input.txt", "/user/cloudera/wordcount/data")
$ hadoop fs –mkdir /user/cloudera/wordcount/data
$ hadoop fs –put wc_input.txt /user/cloudera/word/count/data
map <- function(k,lines) {
words.list <- strsplit(lines, '\\s')
words <- unlist(words.list)
return( keyval(words, 1) )
}
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
reduce <- function(word, counts) {
keyval(word, sum(counts))
}
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
hdfs.root <- 'wordcount'
hdfs.data <- file.path(hdfs.root, 'data')
hdfs.out <- file.path(hdfs.root, 'out')
wordcount <- function (input, output=NULL) {
mapreduce(input=input, output=output, input.format="text", map=map, reduce=reduce)
}
out <- wordcount(hdfs.data, hdfs.out)
results <- from.dfs(out)
results$key[order(results$val, decreasing = TRUE)][1:10]
$ hadoop fs –cat /user/cloudera/wordcount/out/part-00000 | sort –k 2 –nr | head –n 10
> a.time <- proc.time()
> small.ints2=1:100000
> result.normal = sapply(small.ints2, function(x) x^2)
> proc.time() - a.time
> b.time <- proc.time()
> small.ints= to.dfs(1:100000)
> result = mapreduce(input = small.ints, map = function(k,v) cbind(v,v^2))
> proc.time() - b.time
rmr.options(backend = 'local')
out = mapreduce(to.dfs(1), map = function(k, v) rmr.str(v))
# 替所有的值開平方
rmr.options(backend = 'local')
small.ints = to.dfs(1:100)
mapr = mapreduce(input = small.ints,
map = function(k,v) cbind(v,v^2))
result = from.dfs(mapr)
result
data(mtcars)
tapply(mtcars$mpg, mtcars$gear, sum)
rmr.options(backend = 'local')
out = from.dfs(to.dfs(mtcars))
out
sumup = function(input,output = NULL){
## sumup-map
wc.map = function(., row) {
k = row$mpg
keyval(row$gear, k)}
mapreduce(
input = input,
output = output,
map = wc.map
)}
## sumup
sumup = function(input,output = NULL){
## sumup-map
wc.map = function(., row) {
k = row$mpg
rmr.str(row)
keyval(row$gear, k)}
mapreduce(
input = input,
output = output,
map = wc.map
)}
## sumup
wordcount = function(input,output = NULL){
## sumup-map
wc.map = function(., row) {
k = row$mpg
keyval(row$gear, k)}
## sumup-reduce
wc.reduce =
function(word, val ) {
keyval(word, sum(val))}
## sumup-mapreduce
mapreduce(
input = input,
output = output,
map = wc.map,
reduce = wc.reduce
)}
## sumup
wordcount = function(input,output = NULL){
## sumup-map
wc.map = function(., row) {
k = row$mpg
keyval(row$gear, k)}
## sumup-reduce
wc.reduce =
function(word, val ) {
rmr.str(val)
keyval(word, sum(val))}
## sumup-mapreduce
mapreduce(
input = input,
output = output,
map = wc.map,
reduce = wc.reduce
)}
rmr.options(backend = 'local')
out = from.dfs(wordcount(to.dfs(keyval(NULL, mtcars))))
out
solutions = read.csv(file="solutions.csv", header=TRUE)
reviews = read.csv(file="reviews.csv", header=TRUE)
str(solutions)
str(reviews)
merge(df1,df2, by.x="id", by.y="id", all=TRUE)
rmr.options(backend = 'local')
rv = to.dfs(keyval(NULL, cbind(reviews[1:3,], "rv")))
sl = to.dfs(keyval(NULL, cbind(solutions[1:3,], "sl")))
out = from.dfs(tablejoin(c(rv,sl)))
out
## tablejoin-map
wc.map = function(., row) {
keyval(row[1], row)
}
## tablejoin-reduce
wc.reduce =
function(word, val ) {
keyval(word,data.frame(left = val[1,], right = val[2,]))
}
from.dfs(equijoin(left.input = to.dfs(keyval(1:10, 1:10^2)), right.input = to.dfs(keyval(1:10, 1:10^3))))