啟用 SparkR
# 設定SparkR 環境變數
if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
Sys.setenv(SPARK_HOME = "/usr/local/spark")
}
# 載入SparkR
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
##
## Attaching package: 'SparkR'
## The following objects are masked from 'package:stats':
##
## cov, filter, lag, na.omit, predict, sd, var, window
## The following objects are masked from 'package:base':
##
## as.data.frame, colnames, colnames<-, drop, endsWith,
## intersect, rank, rbind, sample, startsWith, subset, summary,
## transform, union
# 本地端啟用
sparkR.session(master = "local[*]",
sparkConfig = list(spark.executor.memory = "600m",
spark.sql.shuffle.partitions = "3",
spark.default.parallelism="2"))
## Spark package found in SPARK_HOME: /usr/local/spark
## Launching java with spark-submit command /usr/local/spark/bin/spark-submit sparkr-shell /tmp/RtmpyJcPzS/backend_port643751cd153
## Java ref type org.apache.spark.sql.SparkSession id 1
# 使用 Cluster
# 將 master 設為 spark://master:7077
讀取房價資料
# 下載房價資料
download.file('https://github.com/ywchiu/sparkr/raw/master/data/lvr_prices.csv', 'lvr_prices.csv')
#讀取本地端的房價資訊
lvr_prices <- read.csv("~/lvr_prices.csv")
#檢視資料內容
str(lvr_prices)
## 'data.frame': 32535 obs. of 12 variables:
## $ X : int 11587 11588 11623 11652 11694 11768 11834 11866 11870 11871 ...
## $ area : Factor w/ 12 levels "中山區","中正區",..: 5 5 1 2 6 3 5 1 4 4 ...
## $ land_sqmeter : num 141.24 139.56 30.97 0.18 22.79 ...
## $ trading_ymd : Factor w/ 1202 levels "2013-01-01","2013-01-02",..: 3 3 2 7 2 3 6 1 11 11 ...
## $ floor : Factor w/ 238 levels "","一層","一層,二層",..: 46 46 185 2 46 176 92 185 46 77 ...
## $ finish_ymd : Factor w/ 6523 levels "1916-03-09","1921-10-01",..: 6264 6264 5808 5794 6141 2210 3394 5827 6298 6298 ...
## $ building_sqmeter : num 287.4 284.3 185.2 46.7 170.1 ...
## $ room : int 3 3 4 0 3 3 3 2 2 2 ...
## $ living_room : int 2 2 2 0 2 2 3 1 1 1 ...
## $ bath : int 3 3 2 0 2 1 2 1 1 1 ...
## $ total_price : num 59600000 55300000 23800000 970000 29700000 ...
## $ price_per_sqmeter: num 218147 204716 128510 20775 174613 ...
#檢視資料型態
class(lvr_prices)
## [1] "data.frame"
資料視覺化
# 用R 繪製
hist(lvr_prices$total_price)

# 取log10
hist(log10(lvr_prices$total_price))

載入資料
# 將資料轉換成Spark DataFrame
lvr_data <- as.DataFrame(lvr_prices)
# 或是直接使用read.df 讀取
df <- read.df("lvr_prices.csv", "csv")
檢視資料
#檢視資料
head(lvr_data)
## X area land_sqmeter trading_ymd floor finish_ymd building_sqmeter
## 1 11587 北投區 141.24 2013-01-03 三層 2012-07-24 287.35
## 2 11588 北投區 139.56 2013-01-03 三層 2012-07-24 284.27
## 3 11623 中山區 30.97 2013-01-02 四層 2008-06-24 185.20
## 4 11652 中正區 0.18 2013-01-07 一層 2008-05-13 46.69
## 5 11694 南港區 22.79 2013-01-02 三層 2011-04-14 170.09
## 6 11768 信義區 5.01 2013-01-03 十層 1980-04-21 50.97
## room living_room bath total_price price_per_sqmeter
## 1 3 2 3 59600000 218147
## 2 3 2 3 55300000 204716
## 3 4 2 2 23800000 128510
## 4 0 0 0 970000 20775
## 5 3 2 2 29700000 174613
## 6 3 2 1 7000000 137336
showDF(lvr_data)
## +-----+----+------------+-----------+-----+----------+----------------+----+-----------+----+-----------+-----------------+
## | X|area|land_sqmeter|trading_ymd|floor|finish_ymd|building_sqmeter|room|living_room|bath|total_price|price_per_sqmeter|
## +-----+----+------------+-----------+-----+----------+----------------+----+-----------+----+-----------+-----------------+
## |11587| 北投區| 141.24| 2013-01-03| 三層|2012-07-24| 287.35| 3| 2| 3| 5.96E7| 218147.0|
## |11588| 北投區| 139.56| 2013-01-03| 三層|2012-07-24| 284.27| 3| 2| 3| 5.53E7| 204716.0|
## |11623| 中山區| 30.97| 2013-01-02| 四層|2008-06-24| 185.2| 4| 2| 2| 2.38E7| 128510.0|
## |11652| 中正區| 0.18| 2013-01-07| 一層|2008-05-13| 46.69| 0| 0| 0| 970000.0| 20775.0|
## |11694| 南港區| 22.79| 2013-01-02| 三層|2011-04-14| 170.09| 3| 2| 2| 2.97E7| 174613.0|
## |11768| 信義區| 5.01| 2013-01-03| 十層|1980-04-21| 50.97| 3| 2| 1| 7000000.0| 137336.0|
## |11834| 北投區| 31.24| 2013-01-06| 五層|1985-10-15| 106.82| 3| 3| 2| 7200000.0| 67403.0|
## |11866| 中山區| 7.42| 2013-01-01| 四層|2008-08-13| 71.1| 2| 1| 1| 1.035E7| 145590.0|
## |11870| 內湖區| 9.43| 2013-01-11| 三層|2012-11-23| 38.55| 2| 1| 1| 6550000.0| 169909.0|
## |11871| 內湖區| 9.43| 2013-01-11| 二層|2012-11-23| 38.33| 2| 1| 1| 5770000.0| 150535.0|
## |11872| 內湖區| 9.43| 2013-01-11| 四層|2012-11-23| 38.55| 2| 1| 1| 6250000.0| 162127.0|
## |11873| 內湖區| 17.89| 2013-01-11| 二層|2012-11-23| 77.77| 2| 1| 1| 1.07E7| 151752.0|
## |11874| 內湖區| 17.89| 2013-01-11| 五層|2012-11-23| 77.77| 2| 1| 1| 1.17E7| 165934.0|
## |11875| 內湖區| 9.43| 2013-01-11| 五層|2012-11-23| 38.55| 2| 1| 1| 5850000.0| 151751.0|
## |11876| 內湖區| 17.89| 2013-01-11| 六層|2012-11-23| 77.77| 2| 1| 1| 1.228E7| 174160.0|
## |11877| 內湖區| 17.89| 2013-01-11| 七層|2012-11-23| 77.77| 2| 1| 1| 1.36E7| 192880.0|
## |11878| 內湖區| 9.43| 2013-01-11| 六層|2012-11-23| 38.55| 2| 1| 1| 5900000.0| 153048.0|
## |11879| 內湖區| 17.54| 2013-01-11| 四層|2012-11-23| 70.51| 2| 1| 1| 1.32E7| 187207.0|
## |12040| 北投區| 11.56| 2013-01-08| 五層|1984-04-19| 64.89| 0| 0| 0| 1000000.0| 15411.0|
## |12046| 信義區| 24.8| 2013-01-02| 四層|1989-06-20| 112.85| 3| 2| 2| 6104900.0| 54097.0|
## +-----+----+------------+-----------+-----+----------+----------------+----+-----------+----+-----------+-----------------+
## only showing top 20 rows
#觀看資料型態
class(lvr_data)
## [1] "SparkDataFrame"
## attr(,"package")
## [1] "SparkR"
#檢視資料Schema
printSchema(lvr_data)
## root
## |-- X: integer (nullable = true)
## |-- area: string (nullable = true)
## |-- land_sqmeter: double (nullable = true)
## |-- trading_ymd: string (nullable = true)
## |-- floor: string (nullable = true)
## |-- finish_ymd: string (nullable = true)
## |-- building_sqmeter: double (nullable = true)
## |-- room: integer (nullable = true)
## |-- living_room: integer (nullable = true)
## |-- bath: integer (nullable = true)
## |-- total_price: double (nullable = true)
## |-- price_per_sqmeter: double (nullable = true)
選擇欄位或篩選資料
#選擇欄位
head(select(lvr_data, lvr_data$area, lvr_data$total_price))
## area total_price
## 1 北投區 59600000
## 2 北投區 55300000
## 3 中山區 23800000
## 4 中正區 970000
## 5 南港區 29700000
## 6 信義區 7000000
#篩選資料
head(filter(lvr_data, lvr_data$area == '大安區'))
## X area land_sqmeter trading_ymd floor finish_ymd building_sqmeter
## 1 12073 大安區 7.69 2013-01-07 三層 2009-11-13 71.35
## 2 12967 大安區 6.66 2013-01-10 十一層 2009-11-13 41.06
## 3 12970 大安區 21.78 2013-01-19 七層 2011-11-07 113.20
## 4 13184 大安區 32.60 2013-01-14 六層 1985-12-30 144.85
## 5 13185 大安區 7.82 2013-01-09 六層 2009-07-15 54.30
## 6 13447 大安區 9.44 2013-01-10 十層 1978-09-12 69.57
## room living_room bath total_price price_per_sqmeter
## 1 1 1 1 14500000 203224
## 2 1 1 1 14000000 340964
## 3 0 0 0 25000000 292877
## 4 3 2 2 42200000 291336
## 5 1 1 1 13500000 248573
## 6 2 1 1 19356800 278235
Magrittr
# install.packages('magrittr')
library(magrittr)
使用magrittr 做資料跟欄位篩選
lvr_data %>%
select(lvr_data$area, lvr_data$total_price) %>%
filter(lvr_data$area == '大安區') %>%
head()
## area total_price
## 1 大安區 14500000
## 2 大安區 14000000
## 3 大安區 25000000
## 4 大安區 42200000
## 5 大安區 13500000
## 6 大安區 19356800
資料彙總與排序
#彙總價格資料
head(summarize(groupBy(lvr_data, lvr_data$area), price_sum = sum(lvr_data$total_price)))
## area price_sum
## 1 中正區 38099724464
## 2 內湖區 128934127518
## 3 大安區 85905894897
## 4 萬華區 18994267967
## 5 南港區 48468724590
## 6 信義區 55300572903
#彙總並排序資料
summarize(groupBy(lvr_data, lvr_data$area), price_sum = sum(lvr_data$total_price)) %>% arrange(desc(.$price_sum)) %>% head()
## area price_sum
## 1 內湖區 128934127518
## 2 士林區 101092427524
## 3 大安區 85905894897
## 4 北投區 85262311220
## 5 中山區 77432241627
## 6 文山區 76701087517
# gapplyCollect
result <- gapplyCollect(
lvr_data,
"area",
function(key, x) {
y <- data.frame(key, sum(x$total_price))
colnames(y) <- c("area", "sum_price")
y
})
# gapply
schema <- structType(structField("area", "string"), structField("total_price", "double"))
result <- gapply(
lvr_data,
"area",
function(key, x) {
y <- data.frame(key, sum(x$total_price), stringsAsFactors = FALSE)
},
schema)
head(collect(arrange(result, "total_price", decreasing = TRUE)))
## area total_price
## 1 內湖區 128934127518
## 2 士林區 101092427524
## 3 大安區 85905894897
## 4 北投區 85262311220
## 5 中山區 77432241627
## 6 文山區 76701087517
轉換欄位 (新增house_age)
lvr_data$house_age <- (datediff(date_format(lvr_data$trading_ymd, "yyyy-MM-dd"), date_format(lvr_data$finish_ymd, "yyyy-MM-dd")))/ 365
根據各區計算每月平均價格變化
# 取得交易年月資料
lvr_data$trading_ym <- date_format(lvr_data$trading_ymd, "yyyy-MM-01")
# 根據區域與年月計算平均交易價格
mean_prices <- summarize(groupBy(lvr_data, lvr_data$area, lvr_data$trading_ym), price_avg = mean(lvr_data$total_price))
# 取得平均價格
mean_df <- collect(mean_prices)
Spark SQL
createOrReplaceTempView(lvr_data, "lvr_data")
lvr_sql <- sql("SELECT area, avg(total_price) FROM lvr_data WHERE house_age < 30 group by area")
head(lvr_sql)
## area avg(total_price)
## 1 中正區 33464565
## 2 內湖區 26294113
## 3 大安區 39127236
## 4 萬華區 15108700
## 5 南港區 30674185
## 6 信義區 35552442