Nir Regev
Lead Data Scientist
Sisense Ltd.
July 31st, 2016
setwd("~/anomaly")
# pkgTest <- function(x)
# {
# if (!require(x,character.only = TRUE))
# {
# install.packages(x,dep=TRUE,repos = 'http://star-www.st-andrews.ac.uk/cran/')
# if(!require(x,character.only = TRUE)) stop("Package not found")
# }
# }
source("pkgTest.R")
pkgTest("data.table")## Loading required package: data.table
## Warning: package 'data.table' was built under R version 3.2.5
pkgTest("stringr")## Loading required package: stringr
pkgTest("devtools")## Loading required package: devtools
## Warning: package 'devtools' was built under R version 3.2.5
pkgTest("caret")## Loading required package: caret
## Warning: package 'caret' was built under R version 3.2.5
## Loading required package: lattice
## Loading required package: ggplot2
## Warning: package 'ggplot2' was built under R version 3.2.5
pkgTest("data.table")
pkgTest("yaml")## Loading required package: yaml
pkgTest("plyr")## Loading required package: plyr
pkgTest("dplyr")## Loading required package: dplyr
## Warning: package 'dplyr' was built under R version 3.2.5
##
## Attaching package: 'dplyr'
## The following objects are masked from 'package:plyr':
##
## arrange, count, desc, failwith, id, mutate, rename, summarise,
## summarize
## The following objects are masked from 'package:data.table':
##
## between, last
## The following objects are masked from 'package:stats':
##
## filter, lag
## The following objects are masked from 'package:base':
##
## intersect, setdiff, setequal, union
pkgTest("devtools")
source("get_csv_from_folder.R")
source("readinteger.R")
source("ttest_analysis.R")
source("lm_cust.R")base <- rnorm(n = 100, mean = 0.42, sd = 0.1)
new <- rnorm(n = 30, mean = 0.61, sd = 0.1)
widget_duration <- c(base,new)
time <- seq(1:(length(base)+length(new)))
df <- data.frame(time,widget_duration)
ggplot(df, aes(x = time, y=widget_duration)) + geom_line() ggplot(df, aes(x = time, y=widget_duration)) + geom_line() +
geom_vline(xintercept = 100, colour="red", linetype = "longdash",size = 2)
config = yaml.load_file("conf.yml")
new.data.time.windows <- as.integer(config$window)
test.type <- config$test.type
current.ts <- config$current.ts
days.back <- config$days.back
s3.location <- "~/anomaly/query-responsivness"
outputpath <- "~/anomaly//widget_outputs"get_csv_from_folder <- function(folder.name,days.back){
setwd(folder.name)
files <- list.files( pattern='*.csv')
files <- files[unlist(lapply(files,function(x) nchar(x)==34))]
diff <- abs(difftime(as.Date(substr(tail(files,1),1,10),format="%Y.%m.%d"),Sys.Date(),units="days")[[1]])
#if (diff > 1) return (NULL)
files <- tail(files,days.back)
file.data.list <-lapply(files, function(x) tryCatch(
read.table(x, header = F, fill = T, sep=",", quote ="\"", skip = 1),error=function(e) NULL))
widget.data <- rbind_all(file.data.list)
names.data <- c("timestamp","action","cubeName","dashboard","duration","host","sisenseVersion","sisenseuid","status","widget")
colnames(widget.data) <- names.data
return (widget.data)
}
print("Time to retrieve data...")## [1] "Time to retrieve data..."
system.time(s3.duration.data <- get_csv_from_folder(s3.location,days.back))## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character
## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character
## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character
## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character
## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character
## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character
## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character
## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character
## user system elapsed
## 118.22 1.43 120.27
system.time(widgets.alerts <- get_build_alerts(outputpath))## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character
## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character
## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character
## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character
## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character
## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character
## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character
## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character
## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character
## user system elapsed
## 0.12 0.00 0.19
names.data <- c("timestamp","action","cubeName","dashboard","duration","host","sisenseVersion","sisenseuid","status","widget")
colnames(s3.duration.data) <- names.data
s3.duration.data$timestamp <- as.Date(s3.duration.data$timestamp)
#startDate <- Sys.Date() - days.back
#s3.duration.data <- subset(s3.duration.data,timestamp > startDate)
s3.duration.data$duration <- as.numeric(as.character(s3.duration.data$duration))
s3.duration.data$key <- paste(s3.duration.data$host,s3.duration.data$dashboard,s3.duration.data$widget,sep="|")
#attach(s3.duration.data)
if (!is.null(current.ts)){
s3.duration.data <- subset(s3.duration.data, timestamp <= as.Date(current.ts))
}
# remove rows with blank widget or cube names
s3.duration.data <- s3.duration.data[!(is.na(s3.duration.data$dashboard) | s3.duration.data$dashboard==""), ]
s3.duration.data <- s3.duration.data[!(is.na(s3.duration.data$widget) | s3.duration.data$widget==""), ]print("Time to split data (per widget) ...")## [1] "Time to split data (per widget) ..."
s3.duration.data.dt <- data.table(s3.duration.data)
s3.duration.data.dt <- s3.duration.data.dt[, grp := .GRP, by = key]
setkey(s3.duration.data.dt, grp)
dt.split.time <- system.time(dashboards.groups.list <- s3.duration.data.dt[, list(list(.SD)), by = grp]$V1)
print(paste0("Data table split time :" , dt.split.time[3], "# of rows : ", nrow(s3.duration.data)/1000000, "M"))## [1] "Data table split time :0.340000000000003# of rows : 4.231741M"
#try split with data frame
df.split.time <- system.time(s3.duration.data.split <-
split(x = s3.duration.data[ 1:1000000, ]
, f = as.factor(s3.duration.data$key)))## Warning in split.default(x = seq_len(nrow(x)), f = f, drop = drop, ...):
## data length is not a multiple of split variable
# benchmark
print(paste0("dt split : ",dt.split.time[3], " vs. df split : ", df.split.time[3] ))## [1] "dt split : 0.340000000000003 vs. df split : 280.93"
# split widget alerts by key
widgets.alerts.dt <- data.table(widgets.alerts[,c("widget","mean.new")])
widgets.alerts.dt <- widgets.alerts.dt[, grp := .GRP, by = widget]
setkey(widgets.alerts.dt, grp)
system.time(widgets.alerts.groups.list <- widgets.alerts.dt[, list(list(.SD)), by = grp]$V1)## user system elapsed
## 0 0 0
trend.list <- lapply(widgets.alerts.groups.list,FUN = lm_cust_widget)
trend.list <- trend.list[!sapply(trend.list, is.null)]
trend.df <- do.call(rbind.data.frame, trend.list)
names(trend.df) <- c("widget", "trend_magnitude")
#memory recycle
rm(list = c('s3.duration.data'))
print(paste("All in all ...",length(dashboards.groups.list)," widgets will be analyzed", sep=""))## [1] "All in all ...74468 widgets will be analyzed"
alert.df <- data.frame(DateRange=as.character(character()),
widget=character(),
alertType=character(),
alertScore=integer(),
user=character(),
version=character(),
minDateBase=character(),
maxDateBase=character(),
minDateNew=character(),
maxDateNew=character(),
mean.base=numeric(),
sd.base=numeric(),
mean.new=numeric(),
sd.new=numeric()
)
names(alert.df) <- c("widget","DateRange","alertType","alertScore","user","version",
"base.min.date","base.max.date","new.min.date","new.max.date",
"mean.base","sd.base","mean.new","sd.new")
#setwd("/home/sisense/Documents/anomaly/widget_outputs")
print("Time to filter data...")## [1] "Time to filter data..."
system.time(cond <- lapply(dashboards.groups.list, function(x) nrow(x) > 5))## user system elapsed
## 1.09 0.00 1.10
system.time(dashboards.groups.list <- dashboards.groups.list[unlist(cond)])## user system elapsed
## 0 0 0
# call t test analysis and visualizations
print("Time to run statistical analysis")## [1] "Time to run statistical analysis"
min.days <- config$min.days
new.data.time.windows <- config$window
min.data.points <- config$min.data.points
test.type <- config$test
min.duration <- config$min.durationsystem.time(alert.list <- lapply(dashboards.groups.list[1:100],
ttest_analysis,
new.data.time.windows,
min.days,
min.data.points,
test.type,
min.duration))## [1] "dashboard : 56d718927f3986c81000021e;Fail Rate"
## [1] "dashboard : 56d718927f3986c81000021e;Fail Rate"
## [1] "dashboard : 56d718927f3986c81000021e;Fail Rate"
## [1] "dashboard : 56f0223fe5b82b8cd9000009;SellPro_Deck"
## [1] "dashboard : 56f0223fe5b82b8cd9000009;SellPro_Deck"
## [1] "dashboard : 56f0223fe5b82b8cd9000009;SellPro_Deck"
## [1] "dashboard : 56f0223fe5b82b8cd9000009;SellPro_Deck"
## [1] "dashboard : 56d718927f3986c81000021e;Fail Rate"
## [1] "dashboard : 5759b6da40215e2c9000000d;SellPro Master "
## [1] "dashboard : 56f0223fe5b82b8cd9000009;SellPro_Deck"
## [1] "dashboard : 56f0223fe5b82b8cd9000009;SellPro_Deck"
## [1] "dashboard : 5759b6da40215e2c9000000d;SellPro Master "
## [1] "dashboard : 56a88cf678cebec490002f78;02 - The Sport "
## [1] "dashboard : 5698fbaa0ec09ce40c000094;1 - Today's Hou"
## [1] "dashboard : 563b6bf1422dd27c50002c50;NOC Monitor Gro"
## [1] "dashboard : 568280f17965440000000203;Screener"
## [1] "dashboard : 56f0223fe5b82b8cd9000009;SellPro_Deck"
## [1] "dashboard : 5759b6da40215e2c9000000d;SellPro Master "
## [1] "dashboard : 5759b6da40215e2c9000000d;SellPro Master "
## [1] "dashboard : 572afc419259d29c1b001777;processing-meas"
## [1] "dashboard : 572afc419259d29c1b001777;processing-meas"
## [1] "dashboard : 56f2e43ebf2c03a8d700001d;Lifeline Master"
## [1] "dashboard : 56f2e43ebf2c03a8d700001d;Lifeline Master"
## [1] "dashboard : 573db60d03c7eb00000009ab;Parts Summary"
## [1] "dashboard : 568280f17965440000000203;Screener"
## [1] "dashboard : 56f2e43ebf2c03a8d700001d;Lifeline Master"
## [1] "dashboard : 56d718927f3986c81000021e;Fail Rate"
## [1] "dashboard : 56f2e43ebf2c03a8d700001d;Lifeline Master"
## [1] "dashboard : 568280f17965440000000203;Screener"
## [1] "dashboard : 56a91a2a3676c1e438000585;DC - Health"
## [1] "dashboard : 569c8486ed6c48cc0e0003b0;3 - MTD Sales a"
## [1] "dashboard : 569c8486ed6c48cc0e0003b0;3 - MTD Sales a"
## [1] "dashboard : 572afc419259d29c1b001777;processing-meas"
## [1] "dashboard : 56e04a39a3b8a1b005000001;Spring AT&T DS "
## [1] "dashboard : 56e04a39a3b8a1b005000001;Spring AT&T DS "
## [1] "dashboard : 563b6bf1422dd27c50002c50;NOC Monitor Gro"
## [1] "dashboard : 563b6bf1422dd27c50002c50;NOC Monitor Gro"
## [1] "dashboard : 556720bb94057700000008cc;Part Sales - Em"
## [1] "dashboard : 56a91a2a3676c1e438000585;DC - Health"
## [1] "dashboard : 56a91a2a3676c1e438000585;DC - Health"
## [1] "dashboard : 572afc419259d29c1b001777;processing-meas"
## [1] "dashboard : 56e04a39a3b8a1b005000001;Spring AT&T DS "
## [1] "dashboard : 56cb11e9c61bc534640002f6;NYC_dash_data"
## [1] "dashboard : 57240b1afac3ed040600080f;SP_Canon_Conten"
## [1] "dashboard : 57240b1afac3ed040600080f;SP_Canon_Conten"
## [1] "dashboard : 568280f17965440000000203;Screener"
## [1] "dashboard : 572a4b709e491ed80d001631;SP_Canon_Awards"
## [1] "dashboard : 55b2407bb13db0b012000013;Waypoint Histor"
## [1] "dashboard : 56cb11e9c61bc534640002f6;NYC_dash_data"
## [1] "dashboard : 56cb11e9c61bc534640002f6;NYC_dash_data"
## [1] "dashboard : 56cb11e9c61bc534640002f6;NYC_dash_data"
## [1] "dashboard : 56c23a3c3f1a38000000087f;User Logins"
## [1] "dashboard : 568280f17965440000000203;Screener"
## [1] "dashboard : 568396de7b33cb000000010a;2.2 Trend By Ch"
## [1] "dashboard : 572afc419259d29c1b001777;processing-meas"
## [1] "dashboard : 5759b6da40215e2c9000000d;SellPro Master "
## [1] "dashboard : 5759b6da40215e2c9000000d;SellPro Master "
## [1] "dashboard : 56f2e43ebf2c03a8d700001d;Lifeline Master"
## [1] "dashboard : 5759b6da40215e2c9000000d;SellPro Master "
## [1] "dashboard : 568163fd18534d000000189d;4.1 Trend By Su"
## [1] "dashboard : 55b2407bb13db0b012000013;Waypoint Histor"
## [1] "dashboard : 55b2407bb13db0b012000013;Waypoint Histor"
## [1] "dashboard : 56c23a3c3f1a38000000087f;User Logins"
## [1] "dashboard : 568396de7b33cb000000010a;2.2 Trend By Ch"
## [1] "dashboard : 568396de7b33cb000000010a;2.2 Trend By Ch"
## [1] "dashboard : 5683a6907b33cb0000000230;4.2 Trend By Su"
## [1] "dashboard : 569278f167f35800000003b9;3.2 Trend By Co"
## [1] "dashboard : 573a249aff520cf862002faf;Ilan's Team Q2 "
## [1] "dashboard : 573a249aff520cf862002faf;Ilan's Team Q2 "
## [1] "dashboard : 573a249aff520cf862002faf;Ilan's Team Q2 "
## [1] "dashboard : 569278f167f35800000003b9;3.2 Trend By Co"
## [1] "dashboard : 56927a6667f358000000043b;3.4 Trend By Co"
## [1] "dashboard : 55b24180b13db0b01200003f;HT Dashboard"
## [1] "dashboard : 568146a618534d00000013e8;3.1 Trend By Co"
## [1] "dashboard : 568146a618534d00000013e8;3.1 Trend By Co"
## user system elapsed
## 1.29 0.03 1.34
ttest_analysis <- function( df,
new.data.window = NULL,
min.days=7,
min.data.points=30,
test.type,
min.duration) {
df <- data.frame(df)
new.record.failed <- NULL
new.record <- NULL
df <- df[!is.na(df$duration),]
if (is.null(df)) return (NULL)
#if (nrow(df)<20) return (NULL)
if (nrow(df)<min.data.points |
length(unique(as.POSIXlt(df$timestamp))) < min.days)
return (NULL)
# sort out file dash and widget names
dash.id <- grep("dashboard",names(df))
dash.name <- substr(df[1,dash.id],start=0,40)
widg.id <- grep("widget",names(df))
widg.name <- as.character(df[1,widg.id])
version <- unique(df$sisenseVersion)[[1]]
cube.id <- grep("cubeName",names(df))
cube.name <- as.character(df[1,cube.id])
key.id <- grep("key",names(df))
key.name <- df[1,key.id]
user <- c(unique(df$sisenseuid))[[1]]
print(paste("dashboard : ", dash.name,sep=""))
# handle division of data to historical and new
if (new.data.window > 0)
{
# write logic for a predefined new data time window
df$timestamp <- as.POSIXlt(df$timestamp)
cutoff.date <- unique(max(df$timestamp)) - new.data.window
baseline.df <- subset(df, timestamp <= cutoff.date)
newdates.df <- subset(df, timestamp > cutoff.date)
if (nrow(newdates.df) < 1.5* new.data.window |
nrow(baseline.df) < 3 * new.data.window) return (NULL)
minDate <- min(as.Date (newdates.df$timestamp))
maxDate <- max(as.Date (newdates.df$timestamp))
minDate.baseline <- min(as.Date (baseline.df$timestamp))
maxDate.baseline <- max(as.Date (baseline.df$timestamp))
} else{
df.sorted <- df[order(as.Date(df$timestamp)),]
num.row <- nrow(df.sorted)
baseline.id <- ceiling(num.row*0.8)
baseline.df <- as.data.frame(df.sorted[1:baseline.id,c("duration","timestamp")]); names(baseline.df)[1] <- "duration"
newdates.df <- as.data.frame(df.sorted[(baseline.id+1):num.row,c("duration","timestamp","status")]);names(newdates.df)[1] <- "duration"
minDate <- min(as.Date (newdates.df$timestamp))
maxDate <- max(as.Date (newdates.df$timestamp))
minDate.baseline <- min(as.Date (baseline.df$timestamp))
maxDate.baseline <- max(as.Date (baseline.df$timestamp))
#version <- unique(newdates.df$sisenseVersion)[[1]]
if (is.null(version)) version = ""
}
if (any(newdates.df$status == "failed")) {
new.record.failed <- c(key.name,
paste("from ",minDate," to ",maxDate,sep=""),
"Query Failed",
"",
user,
version,
"",
"",
"",
"",
"",
"",
"",
"")
}
if (nrow(newdates.df) < 3 |
length(nearZeroVar(newdates.df$duration)) >0 |
length(nearZeroVar(baseline.df$duration))> 0 )
{
return (NULL)
}
# visualize baseline distribution
# calculate median and variance
med.base <- mean(baseline.df$duration)
med.new <- mean(newdates.df$duration)
if (med.new < min.duration)
return (NULL)
sd.base <- sd(baseline.df$duration)
sd.new <- sd(newdates.df$duration)
##############################################################
# t test
if (test.type == "parametric"){
p.value <- t.test(baseline.df$duration,
newdates.df$duration,
alternative = "less",
var.equal=FALSE,
paired=T)$p.value
significance.result <- p.value < 0.01
} else { # non-parametric (wilcox test)
p.value <- wilcox.test(baseline.df$duration,
newdates.df$duration,
alternative = "less",
paired=T)$p.value
#paired=F)$p.value
significance.result <- p.value < 0.05
}
# save to file
setwd("/home/sisense/Documents/anomaly/widget_outputs")
#fname <- substr(df$X56e932620ab266301900093c.Transaction.Dashboard..V2.2..dashboard[1],start=0,25)
file.name <- str_replace_all(dash.name, "[^[:alnum:]]", " ")
# writing results
if (significance.result) {
new.record <- c(key.name,
paste("from ",minDate," to ",maxDate,sep=""),
"Query_Duration_Deterioration",
p.value,
user,
version,
as.character(minDate.baseline),
as.character(maxDate.baseline),
as.character(minDate),
as.character(maxDate),
med.base,
sd.base,
med.new,
sd.new)
}
#t.test(baseline.df$duration,newdates.df$duration)$p.value)
#alerts.df <<- rbind(alerts.df, new.record)
if (!is.null(new.record.failed) & !is.null(new.record))
{
return(list(new.record,new.record.failed))
} else if (!is.null(new.record.failed) & is.null(new.record))
{
return(new.record.failed)
} else if (is.null(new.record.failed) & !is.null(new.record))
{
return(new.record)
}
}rm(list = c('dashboards.groups.list'))
#num_elements_list <- sapply(alert.list,length)
alert.list <- alert.list[!is.null(alert.list)]
#alert.list.clean <- alert.list[unlist(sapply(alert.list,function(x) length(x) > 3))]
alert.list.clean <- alert.list[sapply(alert.list, length) ==14 ]
#alert.list.reg <- alert.list[lapply(alert.list,length)>0]
#alert.list.flattened <- do.call(c, unlist(alert.list, recursive=FALSE))
alert.df.clean <- do.call(rbind.data.frame, alert.list.clean)
names(alert.df.clean) <- c("widget","DateRange","alertType","alertScore","user","version",
"base.min.date","base.max.date","new.min.date","new.max.date",
"mean.base","sd.base","mean.new","sd.new")
alert.df.clean$alertScore <- as.numeric(as.character(alert.df.clean$alertScore))
alert.df.clean$NormalizedAlertScore <- 1 -
(alert.df.clean$alertScore - min(alert.df.clean$alertScore,na.rm = T)) /
(max(alert.df.clean$alertScore,na.rm = T) - min(alert.df.clean$alertScore, na.rm = T))
alert.df.clean$alertScore <- NULL
# joining alerts and trends
alert.df.clean.trend <- merge(x = alert.df.clean, y = trend.df, all.x = T)
alert.df.clean.trend$trend_direction <- ifelse(alert.df.clean.trend$trend_magnitude>0,"Detriorating","Improving")
#print("alerts output sample:")
#alert.df.clean.trend[1:100,]
print("writing alerts csv file locally...")
print.date <- as.Date(ifelse (!is.null(current.ts), current.ts , Sys.Date()),origin="1970-01-01")
write.csv(x=alert.df.clean.trend, file=paste(print.date,"_testtype_",test.type,"_widgets_alerts.csv",sep=""), col.names = T, row.names = F)
print("writing alerts csv file to aws s3")