使用Quartet 資料集
data(Quartet)
str(Quartet)
plot(Quartet$x, Quartet$y1)
lmfit = lm(Quartet$y1~Quartet$x)
abline(lmfit, col="red")
plot(Quartet$x, Quartet$y1)
lmfit2 = lsfit(Quartet$x,Quartet$y1)
abline(lmfit2, col="red")
plot(Quartet$x, Quartet$y2)
lmfit = lm(Quartet$y2~poly(Quartet$x,2))
lines(sort(Quartet$x), lmfit$fit[order(Quartet$x)], col = "red")
plot(Quartet$x, Quartet$y3)
lmfit = rlm(Quartet$y3~Quartet$x)
abline(lmfit, col="red")
require(mlbench)
## Loading required package: mlbench
data(BreastCancer)
BreastCancer <- na.omit(BreastCancer)
BreastCancer$Id <- NULL
set.seed(2)
ind <- sample(2, nrow(BreastCancer), replace = TRUE, prob=c(0.8, 0.2))
require(rpart)
## Loading required package: rpart
x.rp <- rpart(Class ~ ., data=BreastCancer[ind == 1,])
x.rp.pred <- predict(x.rp, type="class", newdata=BreastCancer[ind == 2,])
x.rp.prob <- predict(x.rp, type="prob", newdata=BreastCancer[ind == 2,])
plot(x.rp, main="Decision tree created using rpart")
library(e1071)
x.svm <- svm(Class~., data = BreastCancer[ind == 1,])
require(ROCR)
## Loading required package: ROCR
## Loading required package: gplots
## KernSmooth 2.23 loaded
## Copyright M. P. Wand 1997-2009
##
## Attaching package: 'gplots'
##
## The following object is masked from 'package:stats':
##
## lowess
x.rp.prob.rocr <- prediction(x.rp.prob[,2], BreastCancer[ind == 2,'Class'])
x.rp.perf <- performance(x.rp.prob.rocr, "tpr","fpr")
plot(x.rp.perf, col=2, main="ROC curves comparing classification performance of five machine learning models")
dataset <- read.csv('eco_index.csv',head=TRUE, sep=',', row.names=1)
pc.cr <- princomp(dataset, cor = TRUE)
plot(pc.cr)
screeplot(pc.cr, type="lines")
abline(h=1, lty=3)
biplot(pc.cr)
barplot(sort(-pc.cr$scores[,1], TRUE))
mydata <- read.csv('costumer_segment.txt',head=TRUE, sep='\t')
mydata <- scale(mydata)
d <- dist(mydata, method = "euclidean")
fit <- hclust(d, method="ward")
plot(fit)
k1 = 4
groups <- cutree(fit, k=k1)
rect.hclust(fit, k=k1, border="red")
fit <- kmeans(mydata, k1)
plot(mydata, col = fit$cluster)
library(cluster)
clusplot(mydata, fit$cluster, color=TRUE, shade=TRUE, lines=0)
瀏覽HDFS檔案
hadoop fs –ls
將檔案放到hdfs 上
hadoop fs -put test.txt ./
將檔案下載到本地端
hadoop fs -get test.txt ./test2.txt
瀏覽檔案內容
hadoop fs -cat test.txt
建立目錄
hadoop fs -mkdir test
刪除資料
hadoop fs –rm test.txt
$sudo R
> install.packages(c("codetools", "R", "Rcpp", "RJSONIO", "bitops", "digest", "functional", "stringr", "plyr", "reshape2", "rJava", "caTools"))
$ wget --no-check-certificate https://raw.githubusercontent.com/RevolutionAnalytics/rmr2/master/build/rmr2_3.1.2.tar.gz
$ sudo R CMD INSTALL rmr2_3.1.2.tar.gz
$ wget --no-check-certificate https://raw.github.com/RevolutionAnalytics/rhdfs/master/build/rhdfs_1.0.8.tar.gz
$ sudo HADOOP_CMD=/usr/bin/hadoop R CMD INSTALL rhdfs_1.0.8.tar.gz
設定 HADOOP_CMD
which hadoop
設定 HADOOP_STREAMING
locate streaming | grep jar | more #尋找jar 檔
> Sys.setenv(HADOOP_CMD="/usr/bin/hadoop")
> Sys.setenv(HADOOP_STREAMING="/opt/cloudera/parcels/CDH-5.0.0-1.cdh5.0.0.p0.47/lib/hadoop-mapreduce/hadoop-streaming.jar")
> library(rhdfs)
> hdfs.init()
$ echo $JAVA_HOME
$ sudo JAVA_HOME=/usr/java/jdk1.7.0_45-cloudera R CMD javareconf
啟用rhdfs
hdfs.init ()
將資料從本地端放置HDFS.
hdfs.put('test.txt', './')
觀看當前目錄
hdfs.ls('./')
拷貝檔案
hdfs.copy(‘test.txt’, ‘test2.txt’)
新建資料夾test
hdfs.mkdir(‘test’)
將資料下載到本地端
hdfs.get('test.txt', '/home/cloudera/test3.txt')
將資料搬移到不同位置
hdfs.move('test.txt', ‘./test/q1.txt')
重新命名
hdfs.rename(‘./test/q1.txt',‘./test/test.txt')
變更目錄權限
hdfs.chmod('/RHadoop', permissions= '777')
刪除資料
hdfs.delete(‘./test/‘)
hdfs.rm(‘./test/‘)
觀看檔案資訊
hdfs.file.info(‘./’)
f = hdfs.file("test.txt","w")
data(iris)
hdfs.write(iris,f)
hdfs.close(f)
f = hdfs.file("test.txt", "r")
dfserialized <- hdfs.read(f)
df <- unserialize(dfserialized)
df
hdfs.close(f)
將資料寫進hdfs
small.ints = to.dfs(1:10)
從hdfs 讀回資料
from.dfs(/’tmp/file4eacda5ffa2’)
Sys.setenv(HADOOP_CMD="/usr/bin/hadoop")
Sys.setenv(HADOOP_STREAMING="/opt/cloudera/parcels/CDH-5.0.0-1.cdh5.0.0.p0.47/lib/hadoop-mapreduce/hadoop-streaming.jar")
library(rmr2)
library(rhdfs)
hdfs.init()
hdfs.mkdir(“/user/cloudera/wordcount/data”)h
fs.put("wc_input.txt", "/user/cloudera/wordcount/data")
$ hadoop fs –mkdir /user/cloudera/wordcount/data
$ hadoop fs –put wc_input.txt /user/cloudera/word/count/data
map <- function(k,lines) {
words.list <- strsplit(lines, '\\s')
words <- unlist(words.list)
return( keyval(words, 1) )
}
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
reduce <- function(word, counts) {
keyval(word, sum(counts))
}
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
hdfs.root <- 'wordcount'
hdfs.data <- file.path(hdfs.root, 'data')
hdfs.out <- file.path(hdfs.root, 'out')
wordcount <- function (input, output=NULL) {
mapreduce(input=input, output=output, input.format="text", map=map, reduce=reduce)
}
out <- wordcount(hdfs.data, hdfs.out)
results <- from.dfs(out)
results$key[order(results$val, decreasing = TRUE)][1:10]
$ hadoop fs –cat /user/cloudera/wordcount/out/part-00000 | sort –k 2 –nr | head –n 10
> a.time <- proc.time()
> small.ints2=1:100000
> result.normal = sapply(small.ints2, function(x) x^2)
> proc.time() - a.time
> b.time <- proc.time()
> small.ints= to.dfs(1:100000)
> result = mapreduce(input = small.ints, map = function(k,v) cbind(v,v^2))
> proc.time() - b.time
rmr.options(backend = 'local')
out = mapreduce(to.dfs(1), map = function(k, v) rmr.str(v))
# 替所有的值開平方
rmr.options(backend = 'local')
small.ints = to.dfs(1:100)
mapr = mapreduce(input = small.ints,
map = function(k,v) cbind(v,v^2))
result = from.dfs(mapr)
result
data(mtcars)
tapply(mtcars$mpg, mtcars$gear, sum)
rmr.options(backend = 'local')
out = from.dfs(to.dfs(mtcars))
out
sumup = function(input,output = NULL){
## sumup-map
wc.map = function(., row) {
k = row$mpg
keyval(row$gear, k)}
mapreduce(
input = input,
output = output,
map = wc.map
)}
## sumup
sumup = function(input,output = NULL){
## sumup-map
wc.map = function(., row) {
k = row$mpg
rmr.str(row)
keyval(row$gear, k)}
mapreduce(
input = input,
output = output,
map = wc.map
)}
## sumup
wordcount = function(input,output = NULL){
## sumup-map
wc.map = function(., row) {
k = row$mpg
keyval(row$gear, k)}
## sumup-reduce
wc.reduce =
function(word, val ) {
keyval(word, sum(val))}
## sumup-mapreduce
mapreduce(
input = input,
output = output,
map = wc.map,
reduce = wc.reduce
)}
## sumup
wordcount = function(input,output = NULL){
## sumup-map
wc.map = function(., row) {
k = row$mpg
keyval(row$gear, k)}
## sumup-reduce
wc.reduce =
function(word, val ) {
rmr.str(val)
keyval(word, sum(val))}
## sumup-mapreduce
mapreduce(
input = input,
output = output,
map = wc.map,
reduce = wc.reduce
)}
rmr.options(backend = 'local')
out = from.dfs(wordcount(to.dfs(keyval(NULL, mtcars))))
out
solutions = read.csv(file="solutions.csv", header=TRUE)
reviews = read.csv(file="reviews.csv", header=TRUE)
str(solutions)
str(reviews)
merge(df1,df2, by.x="id", by.y="id", all=TRUE)
rmr.options(backend = 'local')
rv = to.dfs(keyval(NULL, cbind(reviews[1:3,], "rv")))
sl = to.dfs(keyval(NULL, cbind(solutions[1:3,], "sl")))
out = from.dfs(tablejoin(c(rv,sl)))
out
## tablejoin-map
wc.map = function(., row) {
keyval(row[1], row)
}
## tablejoin-reduce
wc.reduce =
function(word, val ) {
keyval(word,data.frame(left = val[1,], right = val[2,]))
}
from.dfs(equijoin(left.input = to.dfs(keyval(1:10, 1:10^2)), right.input = to.dfs(keyval(1:10, 1:10^3))))