Anomaly Detection Model (Frequentist)

Nir Regev
Chief Data Scientist
Sisense Ltd.

July 28, 2016

setwd("~/anomaly")
source("pkgTest.R")
pkgTest("ggplot2")
## Loading required package: ggplot2
## Warning: package 'ggplot2' was built under R version 3.2.5
pkgTest("data.table")
## Loading required package: data.table
## Warning: package 'data.table' was built under R version 3.2.5
pkgTest("stringr")
## Loading required package: stringr
pkgTest("devtools")
## Loading required package: devtools
## Warning: package 'devtools' was built under R version 3.2.5
pkgTest("caret")
## Loading required package: caret
## Warning: package 'caret' was built under R version 3.2.5
## Loading required package: lattice
pkgTest("data.table")
pkgTest("yaml")
## Loading required package: yaml
pkgTest("plyr")
## Loading required package: plyr
pkgTest("dplyr")
## Loading required package: dplyr
## Warning: package 'dplyr' was built under R version 3.2.5
## 
## Attaching package: 'dplyr'
## The following objects are masked from 'package:plyr':
## 
##     arrange, count, desc, failwith, id, mutate, rename, summarise,
##     summarize
## The following objects are masked from 'package:data.table':
## 
##     between, last
## The following objects are masked from 'package:stats':
## 
##     filter, lag
## The following objects are masked from 'package:base':
## 
##     intersect, setdiff, setequal, union
pkgTest("devtools")
source("get_csv_from_folder.R")
source("readinteger.R")
source("ttest_analysis.R")
source("lm_cust.R")

Starting to prepare data

config = yaml.load_file("conf.yml")
new.data.time.windows <- as.integer(config$window)
test.type <- config$test.type
current.ts <- config$current.ts
days.back <- config$days.back
s3.location <- "~/anomaly/query-responsivness"
outputpath <- "~/anomaly//widget_outputs"

yaml

Reading data from directory (with multiple files)

get_csv_from_folder <- function(folder.name,days.back){
  setwd(folder.name)   
  files <- list.files( pattern='*.csv')
  files <- files[unlist(lapply(files,function(x) nchar(x)==34))]
  diff <- abs(difftime(as.Date(substr(tail(files,1),1,10),format="%Y.%m.%d"),Sys.Date(),units="days")[[1]])
  #if (diff > 1) return (NULL)
  files <- tail(files,days.back)
  file.data.list <-lapply(files, function(x) tryCatch(
    read.table(x, header = F, fill = T, sep=",", quote ="\"", skip = 1),error=function(e) NULL))
  widget.data <- rbind_all(file.data.list)
  names.data <- c("timestamp","action","cubeName","dashboard","duration","host","sisenseVersion","sisenseuid","status","widget")
  colnames(widget.data) <- names.data 
  return (widget.data) 
}

print("Time to retrieve data...")
## [1] "Time to retrieve data..."
system.time(s3.duration.data <- get_csv_from_folder(s3.location,days.back))
## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character

## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character

## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character

## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character

## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character

## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character

## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character
##    user  system elapsed 
##  171.32    2.26  173.74
system.time(widgets.alerts <- get_build_alerts(outputpath))
## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character

## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character

## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character

## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character

## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character

## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character

## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character

## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character

## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character

## Warning in rbind_all(file.data.list): Unequal factor levels: coercing to
## character
##    user  system elapsed 
##    0.31    0.00    0.33

Cleaning Data and Key Preperation (for split)

names.data <- c("timestamp","action","cubeName","dashboard","duration","host","sisenseVersion","sisenseuid","status","widget")
colnames(s3.duration.data) <- names.data
s3.duration.data$timestamp <- as.Date(s3.duration.data$timestamp)
startDate <- Sys.Date() - days.back
s3.duration.data <- subset(s3.duration.data,timestamp > startDate)
s3.duration.data$duration <- as.numeric(as.character(s3.duration.data$duration))
s3.duration.data$key <- paste(s3.duration.data$host,s3.duration.data$dashboard,s3.duration.data$widget,sep="|")
#attach(s3.duration.data)
if (!is.null(current.ts)){
  s3.duration.data <- subset(s3.duration.data, timestamp <= as.Date(current.ts))
  }
# remove rows with blank widget or cube names
s3.duration.data <- s3.duration.data[!(is.na(s3.duration.data$dashboard) | s3.duration.data$dashboard==""), ]
s3.duration.data <- s3.duration.data[!(is.na(s3.duration.data$widget) | s3.duration.data$widget==""), ]

Cleasing Data and Key Preperation (for split)

names.data <- c("timestamp","action","cubeName","dashboard","duration","host","sisenseVersion","sisenseuid","status","widget")
colnames(s3.duration.data) <- names.data
s3.duration.data$timestamp <- as.Date(s3.duration.data$timestamp)
startDate <- Sys.Date() - days.back
s3.duration.data <- subset(s3.duration.data,timestamp > startDate)
s3.duration.data$duration <- as.numeric(as.character(s3.duration.data$duration))
s3.duration.data$key <- paste(s3.duration.data$host,s3.duration.data$dashboard,s3.duration.data$widget,sep="|")
#attach(s3.duration.data)
if (!is.null(current.ts)){
  s3.duration.data <- subset(s3.duration.data, timestamp <= as.Date(current.ts))
  }
# remove rows with blank widget or cube names
s3.duration.data <- s3.duration.data[!(is.na(s3.duration.data$dashboard) | s3.duration.data$dashboard==""), ]
s3.duration.data <- s3.duration.data[!(is.na(s3.duration.data$widget) | s3.duration.data$widget==""), ]

Split data

print("Time to split data (per widget) ...")
## [1] "Time to split data (per widget) ..."
s3.duration.data.dt <- data.table(s3.duration.data)
s3.duration.data.dt <- s3.duration.data.dt[, grp := .GRP, by = key]
setkey(s3.duration.data.dt, grp)
dt.split.time <- system.time(dashboards.groups.list <- s3.duration.data.dt[, list(list(.SD)), by = grp]$V1)
print(paste0("Data table split time :" , dt.split.time[3], "# of rows : ", nrow(s3.duration.data)/1000000, "M"))
## [1] "Data table split time :0.319999999999993# of rows : 3.08529M"
#try split with data frame
df.split.time <- system.time(s3.duration.data.split <- 
                    split(x = s3.duration.data[ 1:100000, ]
                        , f = as.factor(s3.duration.data$key)))
## Warning in split.default(x = seq_len(nrow(x)), f = f, drop = drop, ...):
## data length is not a multiple of split variable
# benchmark
print(paste0("dt split : ",dt.split.time[3], " vs. df split : ", df.split.time[3] ))
## [1] "dt split : 0.319999999999993 vs. df split : 26.29"
# split widget alerts by key
widgets.alerts.dt <- data.table(widgets.alerts[,c("widget","mean.new")])
widgets.alerts.dt <- widgets.alerts.dt[, grp := .GRP, by = widget]
setkey(widgets.alerts.dt, grp)
system.time(widgets.alerts.groups.list <- widgets.alerts.dt[, list(list(.SD)), by = grp]$V1)
##    user  system elapsed 
##       0       0       0
trend.list <- lapply(widgets.alerts.groups.list,FUN = lm_cust_widget)
trend.list <- trend.list[!sapply(trend.list, is.null)]
trend.df <- do.call(rbind.data.frame, trend.list)
names(trend.df) <- c("widget", "trend_magnitude")

#memory recycle
rm(list = c('s3.duration.data')) 
print(paste("All in all ...",length(dashboards.groups.list)," widgets will be analyzed", sep=""))
## [1] "All in all ...49892 widgets will be analyzed"

Further pre-processing (skip)

alert.df <- data.frame(DateRange=as.character(character()),
                       widget=character(), 
                       alertType=character(),
                       alertScore=integer(),
                       user=character(),
                       version=character(),
                       minDateBase=character(),
                       maxDateBase=character(),
                       minDateNew=character(),
                       maxDateNew=character(),
                       mean.base=numeric(),
                       sd.base=numeric(),
                       mean.new=numeric(),
                       sd.new=numeric()
)

names(alert.df) <- c("widget","DateRange","alertType","alertScore","user","version",
                     "base.min.date","base.max.date","new.min.date","new.max.date",
                     "mean.base","sd.base","mean.new","sd.new")
#setwd("/home/sisense/Documents/anomaly/widget_outputs")
print("Time to filter data...")
## [1] "Time to filter data..."
system.time(cond <- lapply(dashboards.groups.list, function(x) nrow(x) > 5))
##    user  system elapsed 
##    1.21    0.00    1.21
system.time(dashboards.groups.list <- dashboards.groups.list[unlist(cond)])
##    user  system elapsed 
##    0.01    0.00    0.01
# call t test analysis and visualizations
print("Time to run statistical analysis")
## [1] "Time to run statistical analysis"
min.days <- config$min.days
new.data.time.windows <- config$window
min.data.points <- config$min.data.points
test.type <- config$test
min.duration <- config$min.duration

Calling t test (non-paramteric)

1 2 3

system.time(alert.list <- lapply(dashboards.groups.list[1:100],
                                 ttest_analysis,
                                 new.data.time.windows,
                                 min.days,
                                 min.data.points,
                                 test.type,
                                 min.duration))
## [1] "dashboard : 575ff178ed47b9181e000009;Advanced Threat"
## [1] "dashboard : 56d6eae6846be7c4a300199e;1. Daily Ticker"
## [1] "dashboard : 56d6eae6846be7c4a300199e;1. Daily Ticker"

## [1] "dashboard : 56d6eae6846be7c4a300199e;1. Daily Ticker"

## [1] "dashboard : 56d6eae6846be7c4a300199e;1. Daily Ticker"
## [1] "dashboard : 56d6eae6846be7c4a300199e;1. Daily Ticker"

## [1] "dashboard : 56d6eae6846be7c4a300199e;1. Daily Ticker"
## [1] "dashboard : 576952f255e36b9420000001;Advanced Threat"
## [1] "dashboard : 576952f255e36b9420000001;Advanced Threat"
## [1] "dashboard : 575ff178ed47b9181e000009;Advanced Threat"
## [1] "dashboard : 56d6eae6846be7c4a300199e;1. Daily Ticker"
## [1] "dashboard : 576952f255e36b9420000001;Advanced Threat"
## [1] "dashboard : 576952f255e36b9420000001;Advanced Threat"
## [1] "dashboard : 571a0fc151b7a330d9000763;Company Targets"
## [1] "dashboard : 571a0fc151b7a330d9000763;Company Targets"
## [1] "dashboard : 575ff178ed47b9181e000009;Advanced Threat"
## [1] "dashboard : 575ff178ed47b9181e000009;Advanced Threat"
## [1] "dashboard : 576952f255e36b9420000001;Advanced Threat"
## [1] "dashboard : 568280f17965440000000203;Screener"
## [1] "dashboard : 568280f17965440000000203;Screener"
## [1] "dashboard : 57472bd0aacf90cc01000491;Monthly Intervi"
## [1] "dashboard : 57472caeaacf90cc010004a7;Annual Intervie"
## [1] "dashboard : 57319e556cd36a0000000048;Real Time Busin"
## [1] "dashboard : 560bd022309adac0b60006f0;BI Project Metr"
## [1] "dashboard : 560bd022309adac0b60006f0;BI Project Metr"
## [1] "dashboard : 576952f255e36b9420000001;Advanced Threat"
## [1] "dashboard : 571a0fc151b7a330d9000763;Company Targets"
## [1] "dashboard : 575ff178ed47b9181e000009;Advanced Threat"
## [1] "dashboard : 571a0fc151b7a330d9000763;Company Targets"
## [1] "dashboard : 576952f255e36b9420000001;Advanced Threat"
## [1] "dashboard : 56d6eae6846be7c4a300199e;1. Daily Ticker"

## [1] "dashboard : 56d6eae6846be7c4a300199e;1. Daily Ticker"
## [1] "dashboard : 57319e556cd36a0000000048;Real Time Busin"
## [1] "dashboard : 56d6eae6846be7c4a300199e;1. Daily Ticker"
## [1] "dashboard : 576952f255e36b9420000001;Advanced Threat"
## [1] "dashboard : 572afc419259d29c1b001777;processing-meas"
## [1] "dashboard : 572afc419259d29c1b001777;processing-meas"
## [1] "dashboard : 57319e556cd36a0000000048;Real Time Busin"
## [1] "dashboard : 57319e556cd36a0000000048;Real Time Busin"
## [1] "dashboard : 57699b161c60e544680002b3;Basic Summary"
## [1] "dashboard : 560bd022309adac0b60006f0;BI Project Metr"
## [1] "dashboard : 57699b161c60e544680002b3;Basic Summary"
## [1] "dashboard : 571a0fc151b7a330d9000763;Company Targets"
## [1] "dashboard : 571a0fc151b7a330d9000763;Company Targets"
## [1] "dashboard : 57319e556cd36a0000000048;Real Time Busin"
## [1] "dashboard : 57319e556cd36a0000000048;Real Time Busin"
## [1] "dashboard : 576952f255e36b9420000001;Advanced Threat"
## [1] "dashboard : 5718f3ca5e3c0600000000b1;Company Targets"
## [1] "dashboard : 57699b161c60e544680002b3;Basic Summary"
## [1] "dashboard : 575ff178ed47b9181e000009;Advanced Threat"
## [1] "dashboard : 575ff178ed47b9181e000009;Advanced Threat"
## [1] "dashboard : 5718f3ca5e3c0600000000b1;Company Targets"
## [1] "dashboard : 5718f3ca5e3c0600000000b1;Company Targets"
## [1] "dashboard : 5718f3ca5e3c0600000000b1;Company Targets"
## [1] "dashboard : 5718f3ca5e3c0600000000b1;Company Targets"
## [1] "dashboard : 571a0fc151b7a330d9000763;Company Targets"
## [1] "dashboard : 571a0fc151b7a330d9000763;Company Targets"
## [1] "dashboard : 571a0fc151b7a330d9000763;Company Targets"
## [1] "dashboard : 5748327438f2e63413000059;Daily Phone Act"
## [1] "dashboard : 56d6eae6846be7c4a300199e;1. Daily Ticker"
## [1] "dashboard : 56d6eae6846be7c4a300199e;1. Daily Ticker"

## [1] "dashboard : 56d6eae6846be7c4a300199e;1. Daily Ticker"

## [1] "dashboard : 572afc419259d29c1b001777;processing-meas"
## [1] "dashboard : 572afc419259d29c1b001777;processing-meas"
## [1] "dashboard : 57319e556cd36a0000000048;Real Time Busin"
## [1] "dashboard : 571a0fc151b7a330d9000763;Company Targets"
## [1] "dashboard : 5718f3ca5e3c0600000000b1;Company Targets"
## [1] "dashboard : 575ff178ed47b9181e000009;Advanced Threat"
## [1] "dashboard : 55b2407bb13db0b012000013;Waypoint Histor"
## [1] "dashboard : 55b2407bb13db0b012000013;Waypoint Histor"
## [1] "dashboard : 575ff178ed47b9181e000009;Advanced Threat"
## [1] "dashboard : 56d6eae6846be7c4a300199e;1. Daily Ticker"
## [1] "dashboard : 56d6eae6846be7c4a300199e;1. Daily Ticker"
## [1] "dashboard : 560bd022309adac0b60006f0;BI Project Metr"
## [1] "dashboard : 571a0fc151b7a330d9000763;Company Targets"
## [1] "dashboard : 571a0fc151b7a330d9000763;Company Targets"
## [1] "dashboard : 571a0fc151b7a330d9000763;Company Targets"
## [1] "dashboard : 57471f83aacf90cc010003f9;Weekly Dialogue"
## [1] "dashboard : 5718f3ca5e3c0600000000b1;Company Targets"
## [1] "dashboard : 568280f17965440000000203;Screener"
## [1] "dashboard : 55b2407bb13db0b012000013;Waypoint Histor"
## [1] "dashboard : 57699b161c60e544680002b3;Basic Summary"
## [1] "dashboard : 57319e556cd36a0000000048;Real Time Busin"
## [1] "dashboard : 57699b161c60e544680002b3;Basic Summary"
##    user  system elapsed 
##    5.36    1.51    6.89

ttest analysis

ttest_analysis <- function(            df,
                                       new.data.window = NULL,
                                       min.days=7,
                                       min.data.points=30,
                                       test.type,
                                       min.duration) {
  df <- data.frame(df)
  new.record.failed <- NULL  
  new.record <- NULL
  
  df <- df[!is.na(df$duration),] 
  if (is.null(df)) return (NULL)
  #if (nrow(df)<20) return (NULL)
  if (nrow(df)<min.data.points | 
      length(unique(as.POSIXlt(df$timestamp))) < min.days)
        return (NULL)
  # sort out file dash and widget names
  dash.id <- grep("dashboard",names(df))  
  dash.name <- substr(df[1,dash.id],start=0,40)
  widg.id <- grep("widget",names(df))
  widg.name <- as.character(df[1,widg.id])
  version <- unique(df$sisenseVersion)[[1]]
  cube.id <- grep("cubeName",names(df))
  cube.name <- as.character(df[1,cube.id])
  key.id <- grep("key",names(df))  
  key.name <- df[1,key.id]
  user <- c(unique(df$sisenseuid))[[1]]
 
  if (length(user) == 2)
  {
    print (user)  
  }
  print(paste("dashboard : ", dash.name,sep=""))
  
  # handle division of data to historical and new
  if (new.data.window > 0)
  {
    # write logic for a predefined new data time window
    df$timestamp <- as.POSIXlt(df$timestamp) 
    cutoff.date <- unique(max(df$timestamp)) - new.data.window
    baseline.df <- subset(df, timestamp <= cutoff.date)
    newdates.df <- subset(df, timestamp > cutoff.date)
    if (nrow(newdates.df) < 1.5* new.data.window |
        nrow(baseline.df) < 3 * new.data.window) return (NULL)
    minDate <- min(as.Date (newdates.df$timestamp))
    maxDate <- max(as.Date (newdates.df$timestamp))
    minDate.baseline <- min(as.Date (baseline.df$timestamp))
    maxDate.baseline <- max(as.Date (baseline.df$timestamp))
    
  } else{
    df.sorted <- df[order(as.Date(df$timestamp)),]
    num.row <- nrow(df.sorted)
    baseline.id <- ceiling(num.row*0.8)
    baseline.df <- as.data.frame(df.sorted[1:baseline.id,c("duration","timestamp")]); names(baseline.df)[1] <- "duration"
    newdates.df <- as.data.frame(df.sorted[(baseline.id+1):num.row,c("duration","timestamp","status")]);names(newdates.df)[1] <- "duration"
    minDate <- min(as.Date (newdates.df$timestamp))
    maxDate <- max(as.Date (newdates.df$timestamp))
    minDate.baseline <- min(as.Date (baseline.df$timestamp))
    maxDate.baseline <- max(as.Date (baseline.df$timestamp))
    #version <- unique(newdates.df$sisenseVersion)[[1]]
    if (is.null(version)) version = ""
  }
   
  if (any(newdates.df$status == "failed")) { 
    new.record.failed <- c(key.name, 
                    paste("from ",minDate," to ",maxDate,sep=""),
                    "Query Failed",
                    "",
                    user,
                    version,
                    "",
                    "",
                    "",
                    "",
                    "",
                    "",
                    "",
                    "")
  }
  
  if (nrow(newdates.df) < 3 |
      length(nearZeroVar(newdates.df$duration)) >0 |
      length(nearZeroVar(baseline.df$duration))> 0 )
    {
     return (NULL)
    }
  # visualize baseline distribution
  # calculate median and variance
  med.base <- mean(baseline.df$duration)
  med.new <- mean(newdates.df$duration)
  if (med.new < min.duration)
    return (NULL)
  sd.base <- sd(baseline.df$duration) 
  sd.new <- sd(newdates.df$duration)
  
  ##############################################################
  
  # t test
  if (test.type == "parametric"){
  p.value <- t.test(baseline.df$duration, 
                    newdates.df$duration,
                    alternative = "less",
                    var.equal=FALSE,
                    paired=FALSE)$p.value 
  significance.result <- p.value < 0.01 
  } else { # non-parametric (wilcox test)
    p.value <- wilcox.test(baseline.df$duration,
                      newdates.df$duration,
                      alternative = "less",
                      paired=FALSE)$p.value 
    significance.result <- p.value < 0.05  
  }

  # save to file
  setwd("/home/sisense/Documents/anomaly/widget_outputs")
  #fname <- substr(df$X56e932620ab266301900093c.Transaction.Dashboard..V2.2..dashboard[1],start=0,25)
  file.name <- str_replace_all(dash.name, "[^[:alnum:]]", " ")
  
  # writing results
  
  if (significance.result) { 
    new.record <- c(key.name,    
                    paste("from ",minDate," to ",maxDate,sep=""),
                    "Query_Duration_Deterioration",
                    p.value,
                    user,
                    version,
                    as.character(minDate.baseline),
                    as.character(maxDate.baseline),
                    as.character(minDate),
                    as.character(maxDate),
                    med.base,
                    sd.base,
                    med.new,
                    sd.new)
  }
    #t.test(baseline.df$duration,newdates.df$duration)$p.value)
    #alerts.df <<- rbind(alerts.df, new.record)
    if (!is.null(new.record.failed) & !is.null(new.record))
      {
        return(list(new.record,new.record.failed))
  } else if (!is.null(new.record.failed) & is.null(new.record))
      {
        return(new.record.failed)  
  } else if (is.null(new.record.failed) & !is.null(new.record))
      {
        return(new.record)  
      }
      
}

Post processing alert results

rm(list = c('dashboards.groups.list'))
#num_elements_list <- sapply(alert.list,length)
alert.list <- alert.list[!is.null(alert.list)]
#alert.list.clean <- alert.list[unlist(sapply(alert.list,function(x) length(x) > 3))]
alert.list.clean <- alert.list[sapply(alert.list, length) ==14 ]
#alert.list.reg <- alert.list[lapply(alert.list,length)>0]
#alert.list.flattened <- do.call(c, unlist(alert.list, recursive=FALSE))
alert.df.clean <- do.call(rbind.data.frame, alert.list.clean)
names(alert.df.clean) <- c("widget","DateRange","alertType","alertScore","user","version",
                           "base.min.date","base.max.date","new.min.date","new.max.date",
                           "mean.base","sd.base","mean.new","sd.new")
alert.df.clean$alertScore <- as.numeric(as.character(alert.df.clean$alertScore))
alert.df.clean$NormalizedAlertScore <- 1 - 
  (alert.df.clean$alertScore - min(alert.df.clean$alertScore,na.rm = T)) / 
  (max(alert.df.clean$alertScore,na.rm = T) - min(alert.df.clean$alertScore, na.rm = T))
alert.df.clean$alertScore <- NULL

# joining alerts and trends
alert.df.clean.trend <- merge(x = alert.df.clean, y = trend.df, all.x = T)
alert.df.clean.trend$trend_direction <- ifelse(alert.df.clean.trend$trend_magnitude>0,"Detriorating","Improving")

#print("alerts output sample:")
#alert.df.clean.trend[1:100,]
print("writing alerts csv file locally...")
## [1] "writing alerts csv file locally..."
print.date <- as.Date(ifelse (!is.null(current.ts), current.ts , Sys.Date()),origin="1970-01-01")
write.csv(x=alert.df.clean.trend, file=paste(print.date,"_testtype_",test.type,"_widgets_alerts.csv",sep=""), col.names = T, row.names = F)
## Warning in write.csv(x = alert.df.clean.trend, file = paste(print.date, :
## attempt to set 'col.names' ignored
print("writing alerts csv file to aws s3")
## [1] "writing alerts csv file to aws s3"