rm(list = ls(all=TRUE))
options(digits=5, scipen=10)
library(Matrix)
library(dplyr) # load before sparklyr
library(sparklyr) # the dplyr in spark
Connect to Spark Context
Sys.setenv(SPARK_HOME="/usr/local/spark/spark-2.1.0-bin-hadoop2.7/")
config <- spark_config()
# config$spark.ui.port = "4044"
config$spark.executor.memory = "4G"
config$spark.driver.memory = "4G"
config$spark.yarn.executor.memoryOverhead = "4096"
sc <- spark_connect(master = "spark://hnamenode:7077", config = config)
Read Parquet
t0 = Sys.time()
acquire = "hdfs://192.168.1.100:9000/home/tonychuo/acquire/"
tx = spark_read_parquet(sc, "tx", paste0(acquire, "pq01/TX"))
# tbl_cache(sc, "tx")
Sys.time() - t0
## Time difference of 57.088 secs
A. 資料探索
## # Source: lazy query [?? x 1]
## # Database: spark_connection
## n
## <dbl>
## 1 349655789
## # Source: lazy query [?? x 12]
## # Database: spark_connection
## id chain date dept category company brand size unit qty
## <dbl> <int> <date> <int> <int> <dbl> <int> <dbl> <chr> <int>
## 1 86246 205 2012-07-12 37 3703 1.05e8 2820 10.8 OZ 1
## 2 86246 205 2012-07-12 63 6399 1.02e8 16431 32 OZ 1
## 3 86246 205 2012-07-12 97 9753 1.02e9 0 1 CT 1
## 4 86246 205 2012-07-12 58 5827 1.04e8 14162 16 OZ 2
## 5 86246 205 2012-07-12 53 5307 1.01e8 15266 12 OZ 1
## 6 86246 205 2012-07-12 97 9753 1.02e9 0 1 CT 1
## # ... with 2 more variables: amount <dbl>, tid <int>
tx %>% summarise_at(c(1,2,4:7,12), n_distinct) %>%
collect %>% data.frame
## id chain dept category company brand tid
## 1 311541 134 83 836 32773 35689 26496798
tx %>% summarise(d0 = min(date), d1 = max(date)) # 2012-03-02 2013-07-28
## # Source: lazy query [?? x 2]
## # Database: spark_connection
## d0 d1
## <date> <date>
## 1 2012-03-01 2013-07-27
## Time difference of 1.2404 mins
B. 資料彙整 (RFM)
cust = tx %>% group_by(tid) %>%
mutate(total = sum(amount),
i = row_number(desc(id))
) %>% ungroup %>%
filter(i == 1) %>%
select(id, date, total) %>%
group_by(id) %>%
summarise(
recent = datediff("2013-07-29", max(date)),
freq = n(),
money = mean(total)
) %>% collect %>% data.frame
head(cust)
## id recent freq money
## 1 2748521390 101 158 50.526
## 2 2741367879 131 57 75.290
## 3 3044234508 130 37 43.418
## 4 1930696286 131 94 67.905
## 5 677849520 31 340 18.400
## 6 1815683682 127 93 65.992
## Time difference of 1.651 mins
C. (品類、公司)對應矩陣
# The mapping between category_company into a matrix
cat_com = tx %>% group_by(category, company) %>% tally %>%
collect %>% with(sparseMatrix(
i = as.numeric(factor(category)),
j = as.numeric(factor(company)),
x = n) ); dim(cat_com) # 836 32773
## [1] 836 32773
colSums(cat_com) %>% table %>% head(10)
## .
## 1 2 3 4 5 6 7 8 9 10
## 2541 1368 1079 861 729 531 509 445 402 375
## Time difference of 1.7077 mins
Always Remember to Disconnect !!!
# PLEASE Always Rememeber to Disconnect !!!
spark_disconnect(sc)