rm(list = ls(all=TRUE))
options(digits=5, scipen=10)

library(Matrix)
library(dplyr)      # load before sparklyr
library(sparklyr)   # the dplyr in spark 
Connect to Spark Context
Sys.setenv(SPARK_HOME="/usr/local/spark/spark-2.1.0-bin-hadoop2.7/")
config <- spark_config()
# config$spark.ui.port = "4044"
config$spark.executor.memory = "4G"
config$spark.driver.memory = "4G" 
config$spark.yarn.executor.memoryOverhead = "4096"
sc <- spark_connect(master = "spark://hnamenode:7077", config = config)
Read Parquet
t0 = Sys.time()
acquire = "hdfs://192.168.1.100:9000/home/tonychuo/acquire/"
tx = spark_read_parquet(sc, "tx", paste0(acquire, "pq01/TX"))
# tbl_cache(sc, "tx")
Sys.time() - t0
## Time difference of 53.708 secs


A. 資料探索

count(tx)                   # 349,655,789
## # Source:   lazy query [?? x 1]
## # Database: spark_connection
##           n
##       <dbl>
## 1 349655789
head(tx)
## # Source:   lazy query [?? x 12]
## # Database: spark_connection
##      id chain date        dept category company brand  size unit    qty
##   <dbl> <int> <date>     <int>    <int>   <dbl> <int> <dbl> <chr> <int>
## 1 86246   205 2012-07-12    37     3703  1.05e8  2820  10.8 OZ        1
## 2 86246   205 2012-07-12    63     6399  1.02e8 16431  32   OZ        1
## 3 86246   205 2012-07-12    97     9753  1.02e9     0   1   CT        1
## 4 86246   205 2012-07-12    58     5827  1.04e8 14162  16   OZ        2
## 5 86246   205 2012-07-12    53     5307  1.01e8 15266  12   OZ        1
## 6 86246   205 2012-07-12    97     9753  1.02e9     0   1   CT        1
## # ... with 2 more variables: amount <dbl>, tid <int>
tx %>% summarise_at(c(1,2,4:7,12), n_distinct) %>% 
  collect %>% data.frame
##       id chain dept category company brand      tid
## 1 311541   134   83      836   32773 35689 26496798
tx %>% summarise(d0 = min(date), d1 = max(date)) # 2012-03-02 2013-07-28
## # Source:   lazy query [?? x 2]
## # Database: spark_connection
##   d0         d1        
##   <date>     <date>    
## 1 2012-03-01 2013-07-27
Sys.time() - t0
## Time difference of 1.17 mins


B. 資料彙整 (RFM)

cust = tx %>% group_by(tid) %>% 
  mutate(total = sum(amount), 
         i = row_number(desc(id))
         ) %>% ungroup %>% 
  filter(i == 1) %>% 
  select(id, date, total) %>% 
  group_by(id) %>% 
  summarise(
    recent = datediff("2013-07-29", max(date)),
    freq = n(),
    money = mean(total)
  ) %>% collect %>% data.frame 
head(cust)
##           id recent freq  money
## 1  757581761    128  151 68.548
## 2 2755622792     41  172 24.977
## 3  498251464    104  158 29.107
## 4 4629518018    103   57 95.354
## 5  764182089     38  316 22.560
## 6  294760799     81  170 31.387
Sys.time() - t0
## Time difference of 1.4794 mins


C. (品類、公司)對應矩陣

# The mapping between category_company into a matrix
cat_com = tx %>% group_by(category, company) %>% tally %>% 
  collect %>% with(sparseMatrix(
    i = as.numeric(factor(category)), 
    j = as.numeric(factor(company)),
    x = n) ); dim(cat_com)                # 836 32773
## [1]   836 32773
colSums(cat_com) %>% table %>% head(10)
## .
##    1    2    3    4    5    6    7    8    9   10 
## 2541 1368 1079  861  729  531  509  445  402  375
Sys.time() - t0
## Time difference of 1.5353 mins


Always Remember to Disconnect !!!
# PLEASE Always Rememeber to Disconnect !!!
spark_disconnect(sc)