Library Import

library(dplyr)
library(plotly)
library(tidyr)
library(lubridate)
library(ggplot2)
library(lattice)
library(caret)
library(mlr)
library(reticulate)
library(questionr)
library(digest)
library(sparklyr)

Files Unzip and Data Import

### Files Unzip ###
#setwd("F:/R Projects/KKstream Project/")
#untar("datasets.tgz,list=TRUE")
#untar("datasets.tgz")
#untar("public.tar.gz")

#Data Improt to Cloud Area
datatemp = list.files(pattern="data")
labeltemp = list.files(pattern="label")
#df in both temp
data_number = substr(datatemp,6,8)
label_number = substr(labeltemp,7,9)
intersect_number = intersect(data_number, label_number)
#Searching same file's numbers between data and label.
intersect_number
##  [1] "003" "005" "007" "008" "009" "011" "015" "016" "017" "020" "021"
## [12] "022" "025" "026" "032" "034" "035" "041" "044"

Spark Setting

# Cloud Setting for Large Data Memory
{conf <- spark_config() 
conf$`sparklyr.shell.driver-memory` <- "16G"
conf$spark.memory.fraction <- 0.9
sc = spark_connect(master = "local", 
                   version = "2.4.3",
                   config = conf)}
## Registered S3 method overwritten by 'openssl':
##   method      from
##   print.bytes Rcpp

Upload dataframes to Spark

# Data_df upload to Spark
datadfquerylist = list()
for(i in 1:length(intersect_number)) {
  intername=datatemp[grepl(pattern = intersect_number[i],datatemp)]
  cloudfilename = paste0("data_",substr(intername,6,8))
  localfileaddress = paste0("./",intername)
  datadfquerylist[[i]] = spark_read_csv(sc,name=cloudfilename, path=localfileaddress)
}
# label_df upload to Spark
labeldfquerylist = list()
for(i in 1:length(intersect_number)) {
  intername=labeltemp[grepl(pattern = intersect_number[i],label_number)]
  cloudfilename = paste0("label_",substr(intername,7,9))
  localfileaddress = paste0("./",intername)
  labeldfquerylist[[i]] = spark_read_csv(sc,name=cloudfilename, path=localfileaddress)
}

Unique User_id list

userid_list = list()
for (i in 1:length(datadfquerylist)) {
  temlist = as.list(as_tibble(datadfquerylist[[i]] %>% select(user_id)) %>% unique())
  userid_list = append(userid_list, temlist)
}
unique_id = as.numeric(unlist(userid_list))
print(paste("Number of ID",':',length(unique_id)))
## [1] "Number of ID : 24003"
print("ID List as below: ")
## [1] "ID List as below: "
head(unique_id,5)
## [1] 2542 2543 2544 2545 2546

Data Analysis

- In this part, I chose the data_003 as a micro-view. Furthermore, I picked user_id = 2542 for behavior evaluation.

- Introduction of Day_time_period_label:

> Following KKstream Kaggle competition, I made a strategy for labeling by Hours. The label level is separated into 4 parts.

datadfquerylist[[1]] %>% glimpse() 
## Observations: ??
## Variables: 12
## Database: spark_connection
## $ user_id                     <int> 2542, 2542, 2542, 2542, 2542, 2542...
## $ device_id                   <int> 5419, 5419, 5420, 5420, 5420, 5421...
## $ session_id                  <int> 256570, 256570, 256571, 256571, 25...
## $ title_id                    <int> 73, 73, 73, 73, 73, 56, 56, 56, 56...
## $ event_time                  <dttm> 2017-05-18 05:08:20, 2017-05-18 0...
## $ played_duration             <int> 17, 2, 4, 1, 2, 13, 1, 188, 805, 2...
## $ action_trigger              <chr> "seek", "leave", "next episode", "...
## $ platform                    <chr> "iOS", "iOS", "iOS", "iOS", "iOS",...
## $ episode_number              <int> 4, 4, 4, 5, 6, 27, 27, 27, 27, 27,...
## $ series_total_episodes_count <int> 4, 4, 18, 18, 18, 51, 51, 51, 51, ...
## $ internet_connection_type    <chr> "wifi", "wifi", "wifi", "wifi", "w...
## $ is_trailer                  <lgl> FALSE, FALSE, FALSE, FALSE, FALSE,...
`2542_df` = as_tibble(datadfquerylist[[1]] %>% filter(user_id == 2542))
{`2542_df` = `2542_df` %>%
  mutate(`Hours` = hour(as.POSIXct(event_time,"%Y-%m-%d %H:%M:%S", tz = "")),
         `Minutes` = minute(as.POSIXct(event_time,"%Y-%m-%d %H:%M:%S", tz = "")),
         `Seconds` = second(as.POSIXct(event_time,"%Y-%m-%d %H:%M:%S", tz = "")),
         Dates = format(as.Date(event_time,), format = "%Y-%m-%d"),
         Day_time_period_label = ifelse(`Hours` %in% 1:8, 0,
                                        ifelse(`Hours` %in% 9:16,1,
                                               ifelse(`Hours` %in% 17:20,2,
                                                      ifelse(`Hours` %in% c(21:24,0),3,NA)))))}

Brainstorming for advertisement marketing:

- Fist, I decided to choose several factors from action_trigger, such as ‘pause’,‘seek’,‘next episode’,‘video ended’.

- Second, I filtered out this customer’s records of period_0 by historical data.

- Thrid, during that period, I found the highest frequency of specific hour.

Short conclusion, by this strategy, we could roughly detect user’s high usage in period of time.

# Group by through Day_time_period_label
critical_at = c('pause','seek','next episode','video ended')
smallDtp = `2542_df` %>% 
  filter(Day_time_period_label == 0 & action_trigger %in% critical_at)
#Calculate highest frequency Hour
hfHour = freq(factor(smallDtp$Hours)) %>% 
  mutate(hour_label = row.names(freq(factor(smallDtp$Hours)))) %>%
  filter(n == max(n)) %>% select(hour_label)
plot_ly(data = smallDtp, x=~factor(Hours), type='histogram') %>%
  layout(title='High Frequency for Hours',
         yaxis=list(title='Count'))
smallDtp = smallDtp %>% filter(Hours == as.numeric(hfHour))
head(smallDtp,5)
## # A tibble: 5 x 17
##   user_id device_id session_id title_id event_time          played_duration
##     <int>     <int>      <int>    <int> <dttm>                        <int>
## 1    2542      5421     256579       56 2017-07-07 07:21:54               7
## 2    2542      5421     256579       56 2017-07-07 07:25:03             187
## 3    2542      5421     256579       56 2017-07-07 07:26:20              75
## 4    2542      5421     256579       56 2017-07-07 07:26:47              26
## 5    2542      5421     256579       56 2017-07-07 07:26:55               3
## # ... with 11 more variables: action_trigger <chr>, platform <chr>,
## #   episode_number <int>, series_total_episodes_count <int>,
## #   internet_connection_type <chr>, is_trailer <lgl>, Hours <int>,
## #   Minutes <int>, Seconds <dbl>, Dates <chr>, Day_time_period_label <dbl>

Exploration for title_id:

I chose the title_id = 74 as a sample.

- Second, I calculated highest frequency in episode_number,and analyze the customer’s behavior.

- Thrid, I used one third of maximum value in played_duration to filter out those records. Those records supposed to be the moment that customer was hesitated and searching their targets.

smallDtp = smallDtp %>% filter(title_id == 74)
#Calculate highest frequency episode_number
hfepi = freq(factor(smallDtp$episode_number)) %>% 
  mutate(epi_label = row.names(freq(factor(smallDtp$episode_number)))) %>%
  filter(n == max(n)) %>% select(epi_label)

smallDtp = smallDtp %>%
  filter(episode_number == as.numeric(hfepi)) %>%
  filter(played_duration < (max(played_duration)/3))

final_smallDtp = smallDtp %>% filter(Hours == as.numeric(hfHour))

Small Report:

It shows that the customer’s peak period of usage for seeking items. In addtion, during that peak period, it reports the highest frequency of episode_number in specific title_id.

head(final_smallDtp %>% select(user_id,title_id,episode_number,Hours,Day_time_period_label),1)
## # A tibble: 1 x 5
##   user_id title_id episode_number Hours Day_time_period_label
##     <int>    <int>          <int> <int>                 <dbl>
## 1    2542       74             13     7                     0

Furhter Strategy:

Before model training, I will use 10 CV, standardization, and PCA to do the preprocessing. After that, I will use the multi-target classification algorithm for model training. This algorithm is in mlr package. Finally, I will put the test dataset in the model and use the confused matrix to evaulate the performance.