rm(list = ls(all=TRUE))
options(digits=5, scipen=10)

library(Matrix)
library(dplyr)      # load before sparklyr
library(sparklyr)   # the dplyr in spark 
Connect to Spark Context
Sys.setenv(SPARK_HOME="/usr/local/spark/spark-2.1.0-bin-hadoop2.7/")
config <- spark_config()
# config$spark.ui.port = "4044"
config$spark.executor.memory = "4G"
config$spark.driver.memory = "4G" 
config$spark.yarn.executor.memoryOverhead = "4096"
sc <- spark_connect(master = "spark://hnamenode:7077", config = config)
Read Parquet
t0 = Sys.time()
acquire = "hdfs://192.168.1.100:9000/home/tonychuo/acquire/"
tx = spark_read_parquet(sc, "tx", paste0(acquire, "pq01/TX"))
# tbl_cache(sc, "tx")
Sys.time() - t0
## Time difference of 54.846 secs


A. 資料探索

count(tx)                   # 349,655,789
## # Source:   lazy query [?? x 1]
## # Database: spark_connection
##           n
##       <dbl>
## 1 349655789
head(tx)
## # Source:   lazy query [?? x 12]
## # Database: spark_connection
##      id chain date   dept category company brand  size unit    qty amount
##   <dbl> <int> <chr> <int>    <int>   <dbl> <int> <dbl> <chr> <int>  <dbl>
## 1 86246   205 2012…    37     3703  1.05e8  2820  10.8 OZ        1   1.77
## 2 86246   205 2012…    63     6399  1.02e8 16431  32   OZ        1   2.79
## 3 86246   205 2012…    97     9753  1.02e9     0   1   CT        1  10.2 
## 4 86246   205 2012…    58     5827  1.04e8 14162  16   OZ        2   6   
## 5 86246   205 2012…    53     5307  1.01e8 15266  12   OZ        1   2.29
## 6 86246   205 2012…    97     9753  1.02e9     0   1   CT        1   2.5 
## # ... with 1 more variable: tid <int>
tx %>% summarise_at(c(1,2,4:7,12), n_distinct) %>% 
  collect %>% data.frame
##       id chain dept category company brand      tid
## 1 311541   134   83      836   32773 35689 26496798
tx %>% summarise(d0 = min(date), d1 = max(date)) # 2012-03-02 2013-07-28
## # Source:   lazy query [?? x 2]
## # Database: spark_connection
##   d0         d1        
##   <chr>      <chr>     
## 1 2012-03-02 2013-07-28
Sys.time() - t0
## Time difference of 1.2054 mins


B. 資料彙整 (RFM)

cust = tx %>% group_by(tid) %>% 
  mutate(total = sum(amount), 
         i = row_number(desc(id))
         ) %>% ungroup %>% 
  filter(i == 1) %>% 
  select(id, date, total) %>% 
  group_by(id) %>% 
  summarise(
    recent = datediff("2013-07-29", max(date)),
    freq = n(),
    money = mean(total)
  ) %>% collect %>% data.frame 
head(cust)
##           id recent freq  money
## 1  454431700    127  194 43.593
## 2  475495035     31   81 17.900
## 3 3743178100     81  158 46.889
## 4 2514317045     43   26 80.253
## 5  473811043    131   94 32.827
## 6 2800652478    168   17 26.311
Sys.time() - t0
## Time difference of 1.5044 mins


C. (品類、公司)對應矩陣

# The mapping between category_company into a matrix
cat_com = tx %>% group_by(category, company) %>% tally %>% 
  collect %>% with(sparseMatrix(
    i = as.numeric(factor(category)), 
    j = as.numeric(factor(company)),
    x = n) ); dim(cat_com)                # 836 32773
## [1]   836 32773
colSums(cat_com) %>% table %>% head(10)
## .
##    1    2    3    4    5    6    7    8    9   10 
## 2541 1368 1079  861  729  531  509  445  402  375
Sys.time() - t0
## Time difference of 1.5656 mins


Always Remember to Disconnect !!!
# PLEASE Always Rememeber to Disconnect !!!
spark_disconnect(sc)