rm(list = ls(all=TRUE))
options(digits=5, scipen=10)
library(Matrix)
library(dplyr) # load before sparklyr
library(sparklyr) # the dplyr in spark
Connect to Spark Context
Sys.setenv(SPARK_HOME="/usr/local/spark/spark-2.1.0-bin-hadoop2.7/")
config <- spark_config()
# config$spark.ui.port = "4044"
config$spark.executor.memory = "4G"
config$spark.driver.memory = "4G"
config$spark.yarn.executor.memoryOverhead = "4096"
sc <- spark_connect(master = "spark://hnamenode:7077", config = config)
Read Parquet
t0 = Sys.time()
acquire = "hdfs://192.168.1.100:9000/home/tonychuo/acquire/"
tx = spark_read_parquet(sc, "tx", paste0(acquire, "pq01/TX"))
# tbl_cache(sc, "tx")
Sys.time() - t0
## Time difference of 54.846 secs
A. 資料探索
## # Source: lazy query [?? x 1]
## # Database: spark_connection
## n
## <dbl>
## 1 349655789
## # Source: lazy query [?? x 12]
## # Database: spark_connection
## id chain date dept category company brand size unit qty amount
## <dbl> <int> <chr> <int> <int> <dbl> <int> <dbl> <chr> <int> <dbl>
## 1 86246 205 2012… 37 3703 1.05e8 2820 10.8 OZ 1 1.77
## 2 86246 205 2012… 63 6399 1.02e8 16431 32 OZ 1 2.79
## 3 86246 205 2012… 97 9753 1.02e9 0 1 CT 1 10.2
## 4 86246 205 2012… 58 5827 1.04e8 14162 16 OZ 2 6
## 5 86246 205 2012… 53 5307 1.01e8 15266 12 OZ 1 2.29
## 6 86246 205 2012… 97 9753 1.02e9 0 1 CT 1 2.5
## # ... with 1 more variable: tid <int>
tx %>% summarise_at(c(1,2,4:7,12), n_distinct) %>%
collect %>% data.frame
## id chain dept category company brand tid
## 1 311541 134 83 836 32773 35689 26496798
tx %>% summarise(d0 = min(date), d1 = max(date)) # 2012-03-02 2013-07-28
## # Source: lazy query [?? x 2]
## # Database: spark_connection
## d0 d1
## <chr> <chr>
## 1 2012-03-02 2013-07-28
## Time difference of 1.2054 mins
B. 資料彙整 (RFM)
cust = tx %>% group_by(tid) %>%
mutate(total = sum(amount),
i = row_number(desc(id))
) %>% ungroup %>%
filter(i == 1) %>%
select(id, date, total) %>%
group_by(id) %>%
summarise(
recent = datediff("2013-07-29", max(date)),
freq = n(),
money = mean(total)
) %>% collect %>% data.frame
head(cust)
## id recent freq money
## 1 454431700 127 194 43.593
## 2 475495035 31 81 17.900
## 3 3743178100 81 158 46.889
## 4 2514317045 43 26 80.253
## 5 473811043 131 94 32.827
## 6 2800652478 168 17 26.311
## Time difference of 1.5044 mins
C. (品類、公司)對應矩陣
# The mapping between category_company into a matrix
cat_com = tx %>% group_by(category, company) %>% tally %>%
collect %>% with(sparseMatrix(
i = as.numeric(factor(category)),
j = as.numeric(factor(company)),
x = n) ); dim(cat_com) # 836 32773
## [1] 836 32773
colSums(cat_com) %>% table %>% head(10)
## .
## 1 2 3 4 5 6 7 8 9 10
## 2541 1368 1079 861 729 531 509 445 402 375
## Time difference of 1.5656 mins
Always Remember to Disconnect !!!
# PLEASE Always Rememeber to Disconnect !!!
spark_disconnect(sc)