啟用 SparkR

# 設定SparkR 環境變數
if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
  Sys.setenv(SPARK_HOME = "/usr/local/spark")
}

# 載入SparkR
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
## 
## Attaching package: 'SparkR'
## The following objects are masked from 'package:stats':
## 
##     cov, filter, lag, na.omit, predict, sd, var, window
## The following objects are masked from 'package:base':
## 
##     as.data.frame, colnames, colnames<-, drop, endsWith,
##     intersect, rank, rbind, sample, startsWith, subset, summary,
##     transform, union
#  本地端啟用
sparkR.session(master = "local[*]", 
               sparkConfig = list(spark.executor.memory = "600m", 
                                  spark.sql.shuffle.partitions = "3",
                                  spark.default.parallelism="2"))
## Spark package found in SPARK_HOME: /usr/local/spark
## Launching java with spark-submit command /usr/local/spark/bin/spark-submit   sparkr-shell /tmp/RtmpyJcPzS/backend_port643751cd153
## Java ref type org.apache.spark.sql.SparkSession id 1
# 使用 Cluster
# 將 master 設為 spark://master:7077

讀取房價資料

# 下載房價資料
download.file('https://github.com/ywchiu/sparkr/raw/master/data/lvr_prices.csv', 'lvr_prices.csv')

#讀取本地端的房價資訊
lvr_prices <- read.csv("~/lvr_prices.csv")

#檢視資料內容
str(lvr_prices)
## 'data.frame':    32535 obs. of  12 variables:
##  $ X                : int  11587 11588 11623 11652 11694 11768 11834 11866 11870 11871 ...
##  $ area             : Factor w/ 12 levels "中山區","中正區",..: 5 5 1 2 6 3 5 1 4 4 ...
##  $ land_sqmeter     : num  141.24 139.56 30.97 0.18 22.79 ...
##  $ trading_ymd      : Factor w/ 1202 levels "2013-01-01","2013-01-02",..: 3 3 2 7 2 3 6 1 11 11 ...
##  $ floor            : Factor w/ 238 levels "","一層","一層,二層",..: 46 46 185 2 46 176 92 185 46 77 ...
##  $ finish_ymd       : Factor w/ 6523 levels "1916-03-09","1921-10-01",..: 6264 6264 5808 5794 6141 2210 3394 5827 6298 6298 ...
##  $ building_sqmeter : num  287.4 284.3 185.2 46.7 170.1 ...
##  $ room             : int  3 3 4 0 3 3 3 2 2 2 ...
##  $ living_room      : int  2 2 2 0 2 2 3 1 1 1 ...
##  $ bath             : int  3 3 2 0 2 1 2 1 1 1 ...
##  $ total_price      : num  59600000 55300000 23800000 970000 29700000 ...
##  $ price_per_sqmeter: num  218147 204716 128510 20775 174613 ...
#檢視資料型態
class(lvr_prices)
## [1] "data.frame"

資料視覺化

# 用R 繪製
hist(lvr_prices$total_price)

# 取log10
hist(log10(lvr_prices$total_price))

載入資料

# 將資料轉換成Spark DataFrame
lvr_data <- as.DataFrame(lvr_prices)

# 或是直接使用read.df 讀取
df <- read.df("lvr_prices.csv", "csv")

檢視資料

#檢視資料
head(lvr_data) 
##       X   area land_sqmeter trading_ymd floor finish_ymd building_sqmeter
## 1 11587 北投區       141.24  2013-01-03  三層 2012-07-24           287.35
## 2 11588 北投區       139.56  2013-01-03  三層 2012-07-24           284.27
## 3 11623 中山區        30.97  2013-01-02  四層 2008-06-24           185.20
## 4 11652 中正區         0.18  2013-01-07  一層 2008-05-13            46.69
## 5 11694 南港區        22.79  2013-01-02  三層 2011-04-14           170.09
## 6 11768 信義區         5.01  2013-01-03  十層 1980-04-21            50.97
##   room living_room bath total_price price_per_sqmeter
## 1    3           2    3    59600000            218147
## 2    3           2    3    55300000            204716
## 3    4           2    2    23800000            128510
## 4    0           0    0      970000             20775
## 5    3           2    2    29700000            174613
## 6    3           2    1     7000000            137336
showDF(lvr_data)
## +-----+----+------------+-----------+-----+----------+----------------+----+-----------+----+-----------+-----------------+
## |    X|area|land_sqmeter|trading_ymd|floor|finish_ymd|building_sqmeter|room|living_room|bath|total_price|price_per_sqmeter|
## +-----+----+------------+-----------+-----+----------+----------------+----+-----------+----+-----------+-----------------+
## |11587| 北投區|      141.24| 2013-01-03|   三層|2012-07-24|          287.35|   3|          2|   3|     5.96E7|         218147.0|
## |11588| 北投區|      139.56| 2013-01-03|   三層|2012-07-24|          284.27|   3|          2|   3|     5.53E7|         204716.0|
## |11623| 中山區|       30.97| 2013-01-02|   四層|2008-06-24|           185.2|   4|          2|   2|     2.38E7|         128510.0|
## |11652| 中正區|        0.18| 2013-01-07|   一層|2008-05-13|           46.69|   0|          0|   0|   970000.0|          20775.0|
## |11694| 南港區|       22.79| 2013-01-02|   三層|2011-04-14|          170.09|   3|          2|   2|     2.97E7|         174613.0|
## |11768| 信義區|        5.01| 2013-01-03|   十層|1980-04-21|           50.97|   3|          2|   1|  7000000.0|         137336.0|
## |11834| 北投區|       31.24| 2013-01-06|   五層|1985-10-15|          106.82|   3|          3|   2|  7200000.0|          67403.0|
## |11866| 中山區|        7.42| 2013-01-01|   四層|2008-08-13|            71.1|   2|          1|   1|    1.035E7|         145590.0|
## |11870| 內湖區|        9.43| 2013-01-11|   三層|2012-11-23|           38.55|   2|          1|   1|  6550000.0|         169909.0|
## |11871| 內湖區|        9.43| 2013-01-11|   二層|2012-11-23|           38.33|   2|          1|   1|  5770000.0|         150535.0|
## |11872| 內湖區|        9.43| 2013-01-11|   四層|2012-11-23|           38.55|   2|          1|   1|  6250000.0|         162127.0|
## |11873| 內湖區|       17.89| 2013-01-11|   二層|2012-11-23|           77.77|   2|          1|   1|     1.07E7|         151752.0|
## |11874| 內湖區|       17.89| 2013-01-11|   五層|2012-11-23|           77.77|   2|          1|   1|     1.17E7|         165934.0|
## |11875| 內湖區|        9.43| 2013-01-11|   五層|2012-11-23|           38.55|   2|          1|   1|  5850000.0|         151751.0|
## |11876| 內湖區|       17.89| 2013-01-11|   六層|2012-11-23|           77.77|   2|          1|   1|    1.228E7|         174160.0|
## |11877| 內湖區|       17.89| 2013-01-11|   七層|2012-11-23|           77.77|   2|          1|   1|     1.36E7|         192880.0|
## |11878| 內湖區|        9.43| 2013-01-11|   六層|2012-11-23|           38.55|   2|          1|   1|  5900000.0|         153048.0|
## |11879| 內湖區|       17.54| 2013-01-11|   四層|2012-11-23|           70.51|   2|          1|   1|     1.32E7|         187207.0|
## |12040| 北投區|       11.56| 2013-01-08|   五層|1984-04-19|           64.89|   0|          0|   0|  1000000.0|          15411.0|
## |12046| 信義區|        24.8| 2013-01-02|   四層|1989-06-20|          112.85|   3|          2|   2|  6104900.0|          54097.0|
## +-----+----+------------+-----------+-----+----------+----------------+----+-----------+----+-----------+-----------------+
## only showing top 20 rows
#觀看資料型態
class(lvr_data)
## [1] "SparkDataFrame"
## attr(,"package")
## [1] "SparkR"
#檢視資料Schema
printSchema(lvr_data)
## root
##  |-- X: integer (nullable = true)
##  |-- area: string (nullable = true)
##  |-- land_sqmeter: double (nullable = true)
##  |-- trading_ymd: string (nullable = true)
##  |-- floor: string (nullable = true)
##  |-- finish_ymd: string (nullable = true)
##  |-- building_sqmeter: double (nullable = true)
##  |-- room: integer (nullable = true)
##  |-- living_room: integer (nullable = true)
##  |-- bath: integer (nullable = true)
##  |-- total_price: double (nullable = true)
##  |-- price_per_sqmeter: double (nullable = true)

選擇欄位或篩選資料

#選擇欄位
head(select(lvr_data, lvr_data$area, lvr_data$total_price))
##     area total_price
## 1 北投區    59600000
## 2 北投區    55300000
## 3 中山區    23800000
## 4 中正區      970000
## 5 南港區    29700000
## 6 信義區     7000000
#篩選資料
head(filter(lvr_data, lvr_data$area == '大安區'))
##       X   area land_sqmeter trading_ymd  floor finish_ymd building_sqmeter
## 1 12073 大安區         7.69  2013-01-07   三層 2009-11-13            71.35
## 2 12967 大安區         6.66  2013-01-10 十一層 2009-11-13            41.06
## 3 12970 大安區        21.78  2013-01-19   七層 2011-11-07           113.20
## 4 13184 大安區        32.60  2013-01-14   六層 1985-12-30           144.85
## 5 13185 大安區         7.82  2013-01-09   六層 2009-07-15            54.30
## 6 13447 大安區         9.44  2013-01-10   十層 1978-09-12            69.57
##   room living_room bath total_price price_per_sqmeter
## 1    1           1    1    14500000            203224
## 2    1           1    1    14000000            340964
## 3    0           0    0    25000000            292877
## 4    3           2    2    42200000            291336
## 5    1           1    1    13500000            248573
## 6    2           1    1    19356800            278235

Magrittr

# install.packages('magrittr')
library(magrittr)

使用magrittr 做資料跟欄位篩選

lvr_data %>% 
   select(lvr_data$area, lvr_data$total_price) %>% 
   filter(lvr_data$area == '大安區') %>% 
   head()
##     area total_price
## 1 大安區    14500000
## 2 大安區    14000000
## 3 大安區    25000000
## 4 大安區    42200000
## 5 大安區    13500000
## 6 大安區    19356800

資料彙總與排序

#彙總價格資料
head(summarize(groupBy(lvr_data, lvr_data$area), price_sum = sum(lvr_data$total_price)))
##     area    price_sum
## 1 中正區  38099724464
## 2 內湖區 128934127518
## 3 大安區  85905894897
## 4 萬華區  18994267967
## 5 南港區  48468724590
## 6 信義區  55300572903
#彙總並排序資料
summarize(groupBy(lvr_data, lvr_data$area), price_sum = sum(lvr_data$total_price)) %>% arrange(desc(.$price_sum)) %>% head()
##     area    price_sum
## 1 內湖區 128934127518
## 2 士林區 101092427524
## 3 大安區  85905894897
## 4 北投區  85262311220
## 5 中山區  77432241627
## 6 文山區  76701087517
# gapplyCollect
result <- gapplyCollect(
  lvr_data,
  "area",
  function(key, x) {
    y <- data.frame(key, sum(x$total_price))
    colnames(y) <- c("area", "sum_price")
    y
  })

# gapply
schema <- structType(structField("area", "string"), structField("total_price", "double"))
result <- gapply(
  lvr_data,
  "area",
  function(key, x) {
    y <- data.frame(key, sum(x$total_price), stringsAsFactors = FALSE)
  },
  schema)
head(collect(arrange(result, "total_price", decreasing = TRUE)))
##     area  total_price
## 1 內湖區 128934127518
## 2 士林區 101092427524
## 3 大安區  85905894897
## 4 北投區  85262311220
## 5 中山區  77432241627
## 6 文山區  76701087517

轉換欄位 (新增house_age)

lvr_data$house_age <- (datediff(date_format(lvr_data$trading_ymd, "yyyy-MM-dd"), date_format(lvr_data$finish_ymd, "yyyy-MM-dd")))/ 365

根據各區計算每月平均價格變化

# 取得交易年月資料
lvr_data$trading_ym <- date_format(lvr_data$trading_ymd, "yyyy-MM-01")

# 根據區域與年月計算平均交易價格
mean_prices <- summarize(groupBy(lvr_data, lvr_data$area, lvr_data$trading_ym), price_avg = mean(lvr_data$total_price))

# 取得平均價格
 mean_df <- collect(mean_prices)

Spark SQL

createOrReplaceTempView(lvr_data, "lvr_data")

lvr_sql <- sql("SELECT area, avg(total_price) FROM lvr_data WHERE house_age < 30 group by area")
head(lvr_sql)
##     area avg(total_price)
## 1 中正區         33464565
## 2 內湖區         26294113
## 3 大安區         39127236
## 4 萬華區         15108700
## 5 南港區         30674185
## 6 信義區         35552442

關閉SparkR

sparkR.stop()