library(dplyr)
library(plotly)
library(tidyr)
library(lubridate)
library(ggplot2)
library(lattice)
library(caret)
library(mlr)
library(reticulate)
library(questionr)
library(digest)
library(sparklyr)
### Files Unzip ###
#setwd("F:/R Projects/KKstream Project/")
#untar("datasets.tgz,list=TRUE")
#untar("datasets.tgz")
#untar("public.tar.gz")
#Data Improt to Cloud Area
datatemp = list.files(pattern="data")
labeltemp = list.files(pattern="label")
#df in both temp
data_number = substr(datatemp,6,8)
label_number = substr(labeltemp,7,9)
intersect_number = intersect(data_number, label_number)
#Searching same file's numbers between data and label.
intersect_number
## [1] "003" "005" "007" "008" "009" "011" "015" "016" "017" "020" "021"
## [12] "022" "025" "026" "032" "034" "035" "041" "044"
# Cloud Setting for Large Data Memory
{conf <- spark_config()
conf$`sparklyr.shell.driver-memory` <- "16G"
conf$spark.memory.fraction <- 0.9
sc = spark_connect(master = "local",
version = "2.4.3",
config = conf)}
## Registered S3 method overwritten by 'openssl':
## method from
## print.bytes Rcpp
# Data_df upload to Spark
datadfquerylist = list()
for(i in 1:length(intersect_number)) {
intername=datatemp[grepl(pattern = intersect_number[i],datatemp)]
cloudfilename = paste0("data_",substr(intername,6,8))
localfileaddress = paste0("./",intername)
datadfquerylist[[i]] = spark_read_csv(sc,name=cloudfilename, path=localfileaddress)
}
# label_df upload to Spark
labeldfquerylist = list()
for(i in 1:length(intersect_number)) {
intername=labeltemp[grepl(pattern = intersect_number[i],label_number)]
cloudfilename = paste0("label_",substr(intername,7,9))
localfileaddress = paste0("./",intername)
labeldfquerylist[[i]] = spark_read_csv(sc,name=cloudfilename, path=localfileaddress)
}
userid_list = list()
for (i in 1:length(datadfquerylist)) {
temlist = as.list(as_tibble(datadfquerylist[[i]] %>% select(user_id)) %>% unique())
userid_list = append(userid_list, temlist)
}
unique_id = as.numeric(unlist(userid_list))
print(paste("Number of ID",':',length(unique_id)))
## [1] "Number of ID : 24003"
print("ID List as below: ")
## [1] "ID List as below: "
head(unique_id,5)
## [1] 2542 2543 2544 2545 2546
datadfquerylist[[1]] %>% glimpse()
## Observations: ??
## Variables: 12
## Database: spark_connection
## $ user_id <int> 2542, 2542, 2542, 2542, 2542, 2542...
## $ device_id <int> 5419, 5419, 5420, 5420, 5420, 5421...
## $ session_id <int> 256570, 256570, 256571, 256571, 25...
## $ title_id <int> 73, 73, 73, 73, 73, 56, 56, 56, 56...
## $ event_time <dttm> 2017-05-18 05:08:20, 2017-05-18 0...
## $ played_duration <int> 17, 2, 4, 1, 2, 13, 1, 188, 805, 2...
## $ action_trigger <chr> "seek", "leave", "next episode", "...
## $ platform <chr> "iOS", "iOS", "iOS", "iOS", "iOS",...
## $ episode_number <int> 4, 4, 4, 5, 6, 27, 27, 27, 27, 27,...
## $ series_total_episodes_count <int> 4, 4, 18, 18, 18, 51, 51, 51, 51, ...
## $ internet_connection_type <chr> "wifi", "wifi", "wifi", "wifi", "w...
## $ is_trailer <lgl> FALSE, FALSE, FALSE, FALSE, FALSE,...
`2542_df` = as_tibble(datadfquerylist[[1]] %>% filter(user_id == 2542))
{`2542_df` = `2542_df` %>%
mutate(`Hours` = hour(as.POSIXct(event_time,"%Y-%m-%d %H:%M:%S", tz = "")),
`Minutes` = minute(as.POSIXct(event_time,"%Y-%m-%d %H:%M:%S", tz = "")),
`Seconds` = second(as.POSIXct(event_time,"%Y-%m-%d %H:%M:%S", tz = "")),
Dates = format(as.Date(event_time,), format = "%Y-%m-%d"),
Day_time_period_label = ifelse(`Hours` %in% 1:8, 0,
ifelse(`Hours` %in% 9:16,1,
ifelse(`Hours` %in% 17:20,2,
ifelse(`Hours` %in% c(21:24,0),3,NA)))))}
# Group by through Day_time_period_label
critical_at = c('pause','seek','next episode','video ended')
smallDtp = `2542_df` %>%
filter(Day_time_period_label == 0 & action_trigger %in% critical_at)
#Calculate highest frequency Hour
hfHour = freq(factor(smallDtp$Hours)) %>%
mutate(hour_label = row.names(freq(factor(smallDtp$Hours)))) %>%
filter(n == max(n)) %>% select(hour_label)
plot_ly(data = smallDtp, x=~factor(Hours), type='histogram') %>%
layout(title='High Frequency for Hours',
yaxis=list(title='Count'))
smallDtp = smallDtp %>% filter(Hours == as.numeric(hfHour))
head(smallDtp,5)
## # A tibble: 5 x 17
## user_id device_id session_id title_id event_time played_duration
## <int> <int> <int> <int> <dttm> <int>
## 1 2542 5421 256579 56 2017-07-07 07:21:54 7
## 2 2542 5421 256579 56 2017-07-07 07:25:03 187
## 3 2542 5421 256579 56 2017-07-07 07:26:20 75
## 4 2542 5421 256579 56 2017-07-07 07:26:47 26
## 5 2542 5421 256579 56 2017-07-07 07:26:55 3
## # ... with 11 more variables: action_trigger <chr>, platform <chr>,
## # episode_number <int>, series_total_episodes_count <int>,
## # internet_connection_type <chr>, is_trailer <lgl>, Hours <int>,
## # Minutes <int>, Seconds <dbl>, Dates <chr>, Day_time_period_label <dbl>
smallDtp = smallDtp %>% filter(title_id == 74)
#Calculate highest frequency episode_number
hfepi = freq(factor(smallDtp$episode_number)) %>%
mutate(epi_label = row.names(freq(factor(smallDtp$episode_number)))) %>%
filter(n == max(n)) %>% select(epi_label)
smallDtp = smallDtp %>%
filter(episode_number == as.numeric(hfepi)) %>%
filter(played_duration < (max(played_duration)/3))
final_smallDtp = smallDtp %>% filter(Hours == as.numeric(hfHour))
head(final_smallDtp %>% select(user_id,title_id,episode_number,Hours,Day_time_period_label),1)
## # A tibble: 1 x 5
## user_id title_id episode_number Hours Day_time_period_label
## <int> <int> <int> <int> <dbl>
## 1 2542 74 13 7 0