library(dplyr)
## 
## Attaching package: 'dplyr'
## The following objects are masked from 'package:stats':
## 
##     filter, lag
## The following objects are masked from 'package:base':
## 
##     intersect, setdiff, setequal, union
library(ggplot2)
library(RPostgres)
library(zoo)
## 
## Attaching package: 'zoo'
## The following objects are masked from 'package:base':
## 
##     as.Date, as.Date.numeric
library(reshape2)
library(randomForest)
## randomForest 4.6-14
## Type rfNews() to see new features/changes/bug fixes.
## 
## Attaching package: 'randomForest'
## The following object is masked from 'package:ggplot2':
## 
##     margin
## The following object is masked from 'package:dplyr':
## 
##     combine
library(gridExtra)
## 
## Attaching package: 'gridExtra'
## The following object is masked from 'package:randomForest':
## 
##     combine
## The following object is masked from 'package:dplyr':
## 
##     combine
pconn_r <- dbConnect(RPostgres::Postgres(),dbname = 'dev',
               host = 'redshift-cluster-2.c3s1z6vmjxf5.ap-southeast-1.redshift.amazonaws.com',
               port = 5439,
               user ='awsuser',
               password = 'Crayonshin9'
               )
dbListTables(pconn_r)
## [1] "dim_calendar"       "dim_product"        "dim_territory"     
## [4] "fact_sales_revenue" "dim_account"        "churn_analysis"
# create tables reference & check if data exists
churn_analysis <- tbl(pconn_r, "churn_analysis")
churn_analysis
## # Source:   table<churn_analysis> [?? x 30]
## # Database: postgres
## #   [awsuser@redshift-cluster-2.c3s1z6vmjxf5.ap-southeast-1.redshift.amazonaws.com:5439/dev]
##    territory_sid payer_account_id month_id product_sid sales_revenue region
##    <chr>         <chr>               <int> <chr>               <dbl> <chr> 
##  1 TS0000000000~ PA0000000000012~   201504 PHS0000000~          5.25 APAC ~
##  2 TS0000000000~ PA0000000000012~   201504 PHS0000000~          0    APAC ~
##  3 TS0000000000~ PA0000000000012~   201504 PHS0000000~          1.06 APAC ~
##  4 TS0000000000~ PA0000000000029~   201806 PHS0000000~          0    APAC ~
##  5 TS0000000000~ PA0000000000029~   201711 PHS0000000~          0.01 APAC ~
##  6 TS0000000000~ PA0000000000029~   201707 PHS0000000~          0.22 APAC ~
##  7 TS0000000000~ PA0000000000029~   201807 PHS0000000~          0    APAC ~
##  8 TS0000000000~ PA0000000000029~   201708 PHS0000000~          0    APAC ~
##  9 TS0000000000~ PA0000000000029~   201809 PHS0000000~          0    APAC ~
## 10 TS0000000000~ PA0000000000029~   201711 PHS0000000~          0    APAC ~
## # ... with more rows, and 24 more variables: sub_region <chr>,
## #   territory_code <chr>, segment <chr>, product_line_name <chr>,
## #   product_name <chr>, sub_product_name <chr>, service_group_name <chr>,
## #   product_company_name <chr>, is_payer_account_bill_in_90_days <chr>,
## #   is_payer_account_enterprise <chr>,
## #   is_payer_account_domain_free_domain <chr>,
## #   is_payer_account_on_enterprise_support <chr>,
## #   is_payer_account_on_business_support <chr>,
## #   is_payer_account_fraud <chr>, is_payer_account_suspended <chr>,
## #   is_payer_account_internal <chr>,
## #   is_payer_account_inp_terminated <chr>,
## #   is_payer_account_on_behalf_of <chr>,
## #   is_payer_account_tax_exempt <chr>, is_payer_account_reseller <chr>,
## #   churn_2016 <int>, churn_2017 <int>, churn_2018 <int>, churn <int>
# create tables reference & check if data exists
colnames(churn_analysis)
##  [1] "territory_sid"                         
##  [2] "payer_account_id"                      
##  [3] "month_id"                              
##  [4] "product_sid"                           
##  [5] "sales_revenue"                         
##  [6] "region"                                
##  [7] "sub_region"                            
##  [8] "territory_code"                        
##  [9] "segment"                               
## [10] "product_line_name"                     
## [11] "product_name"                          
## [12] "sub_product_name"                      
## [13] "service_group_name"                    
## [14] "product_company_name"                  
## [15] "is_payer_account_bill_in_90_days"      
## [16] "is_payer_account_enterprise"           
## [17] "is_payer_account_domain_free_domain"   
## [18] "is_payer_account_on_enterprise_support"
## [19] "is_payer_account_on_business_support"  
## [20] "is_payer_account_fraud"                
## [21] "is_payer_account_suspended"            
## [22] "is_payer_account_internal"             
## [23] "is_payer_account_inp_terminated"       
## [24] "is_payer_account_on_behalf_of"         
## [25] "is_payer_account_tax_exempt"           
## [26] "is_payer_account_reseller"             
## [27] "churn_2016"                            
## [28] "churn_2017"                            
## [29] "churn_2018"                            
## [30] "churn"
# look at structure and values
glimpse(churn_analysis)
## Observations: ??
## Variables: 30
## $ territory_sid                          <chr> "TS000000000000005494",...
## $ payer_account_id                       <chr> "PA000000000001277761",...
## $ month_id                               <int> 201504, 201503, 201708,...
## $ product_sid                            <chr> "PHS000000000000058062"...
## $ sales_revenue                          <dbl> 0.000000, 1.929999, 0.0...
## $ region                                 <chr> "APAC Region 2", "APAC ...
## $ sub_region                             <chr> "Sub Region106", "Sub R...
## $ territory_code                         <chr> "Territory Code13198", ...
## $ segment                                <chr> "APAC Segment 5", "APAC...
## $ product_line_name                      <chr> "Product Line 13", "Pro...
## $ product_name                           <chr> "Product 11423", "Produ...
## $ sub_product_name                       <chr> "Sub Product 11", "Sub ...
## $ service_group_name                     <chr> "Service Group 18", "Se...
## $ product_company_name                   <chr> "Product Company 1", "P...
## $ is_payer_account_bill_in_90_days       <chr> "N", "N", "Y", "Y", "Y"...
## $ is_payer_account_enterprise            <chr> "N", "N", "N", "N", "N"...
## $ is_payer_account_domain_free_domain    <chr> "Y", "Y", "Y", "Y", "Y"...
## $ is_payer_account_on_enterprise_support <chr> "N", "N", "N", "N", "N"...
## $ is_payer_account_on_business_support   <chr> "N", "N", "N", "N", "N"...
## $ is_payer_account_fraud                 <chr> "N", "N", "N", "N", "N"...
## $ is_payer_account_suspended             <chr> "N", "N", "N", "N", "N"...
## $ is_payer_account_internal              <chr> "N", "N", "N", "N", "N"...
## $ is_payer_account_inp_terminated        <chr> "Y", "Y", "N", "N", "N"...
## $ is_payer_account_on_behalf_of          <chr> "N", "N", "N", "N", "N"...
## $ is_payer_account_tax_exempt            <chr> "N", "N", "N", "N", "N"...
## $ is_payer_account_reseller              <chr> "N", "N", "N", "N", "N"...
## $ churn_2016                             <int> 0, 0, 0, 0, 0, 0, 0, 0,...
## $ churn_2017                             <int> 0, 0, 0, 0, 0, 0, 0, 0,...
## $ churn_2018                             <int> 0, 0, 0, 0, 0, 0, 0, 0,...
## $ churn                                  <int> 0, 0, 0, 0, 0, 0, 0, 0,...
# Total no of observation
churn_analysis %>% tally()
## # Source:   lazy query [?? x 1]
## # Database: postgres
## #   [awsuser@redshift-cluster-2.c3s1z6vmjxf5.ap-southeast-1.redshift.amazonaws.com:5439/dev]
##   n              
##   <S3: integer64>
## 1 12599760
# Total no of observation
set.seed(1) #Ensure reproducible code 
#sample 
query1  <- churn_analysis %>%
  select(territory_sid  ,payer_account_id   ,month_id,  product_sid ,sales_revenue, region, sub_region  ,territory_code ,segment,   is_payer_account_bill_in_90_days,   is_payer_account_enterprise ,is_payer_account_domain_free_domain,   is_payer_account_on_enterprise_support, is_payer_account_on_business_support,   is_payer_account_fraud  ,is_payer_account_suspended,    is_payer_account_internal,  is_payer_account_inp_terminated,    is_payer_account_on_behalf_of   ,is_payer_account_tax_exempt,   is_payer_account_reseller,  product_name,   sub_product_name,   service_group_name, product_company_name,   churn_2016, churn_2017, churn_2018, churn
) %>%
  mutate(x = random()) %>% collapse() %>%
  filter(x < 0.01)
show_query(query1)
## <SQL>
## SELECT *
## FROM (SELECT "territory_sid", "payer_account_id", "month_id", "product_sid", "sales_revenue", "region", "sub_region", "territory_code", "segment", "is_payer_account_bill_in_90_days", "is_payer_account_enterprise", "is_payer_account_domain_free_domain", "is_payer_account_on_enterprise_support", "is_payer_account_on_business_support", "is_payer_account_fraud", "is_payer_account_suspended", "is_payer_account_internal", "is_payer_account_inp_terminated", "is_payer_account_on_behalf_of", "is_payer_account_tax_exempt", "is_payer_account_reseller", "product_name", "sub_product_name", "service_group_name", "product_company_name", "churn_2016", "churn_2017", "churn_2018", "churn", RANDOM() AS "x"
## FROM (SELECT "territory_sid", "payer_account_id", "month_id", "product_sid", "sales_revenue", "region", "sub_region", "territory_code", "segment", "is_payer_account_bill_in_90_days", "is_payer_account_enterprise", "is_payer_account_domain_free_domain", "is_payer_account_on_enterprise_support", "is_payer_account_on_business_support", "is_payer_account_fraud", "is_payer_account_suspended", "is_payer_account_internal", "is_payer_account_inp_terminated", "is_payer_account_on_behalf_of", "is_payer_account_tax_exempt", "is_payer_account_reseller", "product_name", "sub_product_name", "service_group_name", "product_company_name", "churn_2016", "churn_2017", "churn_2018", "churn"
## FROM "churn_analysis") "gjoxfxyrqb") "ferjumszju"
## WHERE ("x" < 0.01)
samp1 <- collect(query1)
dim(samp1)
## [1] 125438     30
#apply function is.na over columns and sum them up
apply(is.na(samp1), 2, sum)
##                          territory_sid 
##                                      0 
##                       payer_account_id 
##                                      0 
##                               month_id 
##                                      0 
##                            product_sid 
##                                      0 
##                          sales_revenue 
##                                      0 
##                                 region 
##                                      0 
##                             sub_region 
##                                      0 
##                         territory_code 
##                                      0 
##                                segment 
##                                      0 
##       is_payer_account_bill_in_90_days 
##                                      0 
##            is_payer_account_enterprise 
##                                      0 
##    is_payer_account_domain_free_domain 
##                                      0 
## is_payer_account_on_enterprise_support 
##                                      0 
##   is_payer_account_on_business_support 
##                                      0 
##                 is_payer_account_fraud 
##                                      0 
##             is_payer_account_suspended 
##                                      0 
##              is_payer_account_internal 
##                                      0 
##        is_payer_account_inp_terminated 
##                                      0 
##          is_payer_account_on_behalf_of 
##                                      0 
##            is_payer_account_tax_exempt 
##                                      0 
##              is_payer_account_reseller 
##                                      0 
##                           product_name 
##                                      0 
##                       sub_product_name 
##                                      0 
##                     service_group_name 
##                                      0 
##                   product_company_name 
##                                      0 
##                             churn_2016 
##                                      0 
##                             churn_2017 
##                                      0 
##                             churn_2018 
##                                      0 
##                                  churn 
##                                      0 
##                                      x 
##                                      0
#transaction for year 2015
ggplot(data=samp1,aes(x=month_id))+geom_bar(color='darkblue', fill='lightblue') + theme(axis.text.x= element_text(angle=90,hjust=1)) + ggtitle("Yr 2018 transactions by month")+ geom_text(stat='count',aes(label=..count..),vjust=2) +
  xlim(201500, 201513)
## Warning: Removed 106032 rows containing non-finite values (stat_count).

## Warning: Removed 106032 rows containing non-finite values (stat_count).

#transaction for year 2016
ggplot(data=samp1,aes(x=month_id))+geom_bar(color='darkblue', fill='lightblue') + theme(axis.text.x= element_text(angle=90,hjust=1)) + ggtitle("Yr 2018 transactions by month")+ geom_text(stat='count',aes(label=..count..),vjust=2) +
  xlim(201600, 201613)
## Warning: Removed 96670 rows containing non-finite values (stat_count).
## Warning: Removed 96670 rows containing non-finite values (stat_count).

#transaction for year 2017
ggplot(data=samp1,aes(x=month_id))+geom_bar(color='darkblue', fill='lightblue') + theme(axis.text.x= element_text(angle=90,hjust=1)) + ggtitle("Yr 2018 transactions by month")+ geom_text(stat='count',aes(label=..count..),vjust=2) +
  xlim(201700, 201713)
## Warning: Removed 85383 rows containing non-finite values (stat_count).
## Warning: Removed 85383 rows containing non-finite values (stat_count).

#transaction for year 2018
ggplot(data=samp1,aes(x=month_id))+geom_bar(color='darkblue', fill='lightblue') + theme(axis.text.x= element_text(angle=90,hjust=1)) + ggtitle("Yr 2018 transactions by month")+ geom_text(stat='count',aes(label=..count..),vjust=2) +
  xlim(201800, 201813)
## Warning: Removed 88229 rows containing non-finite values (stat_count).
## Warning: Removed 88229 rows containing non-finite values (stat_count).

samp1$year<- as.factor(substr(samp1$month_id,1,4))

ggplot(samp1,aes(x=samp1$year))+geom_bar(color='darkblue', fill='lightblue') + theme(axis.text.x= element_text(hjust=1)) + ggtitle("No. of transactions by year")+ geom_text(stat='count',aes(label=..count..),vjust=2)

#transaction for year 2015
ggplot(data = samp1, aes(x = samp1$month_id)) +
    geom_bar(aes(fill = as.factor(samp1$churn)), position = "dodge") + 
  theme(axis.text.x= element_text(hjust=1)) + ggtitle("No. of 2015 transactions by churn") +
  xlim(201500, 201513)
## Warning: Removed 106032 rows containing non-finite values (stat_count).

#transaction for year 2016
ggplot(data = samp1, aes(x = samp1$month_id)) +
    geom_bar(aes(fill = as.factor(samp1$churn)), position = "dodge") + 
  theme(axis.text.x= element_text(hjust=1)) + ggtitle("No. of 2016 transactions by churn") +
  xlim(201600, 201613)
## Warning: Removed 96670 rows containing non-finite values (stat_count).

#transaction for year 2017
ggplot(data = samp1, aes(x = samp1$month_id)) +
    geom_bar(aes(fill = as.factor(samp1$churn)), position = "dodge") + 
  theme(axis.text.x= element_text(hjust=1)) + ggtitle("No. of 2017 transactions by churn") +
  xlim(201700, 201713)
## Warning: Removed 85383 rows containing non-finite values (stat_count).

#transaction for year 2018
ggplot(data = samp1, aes(x = samp1$month_id)) +
    geom_bar(aes(fill = as.factor(samp1$churn)), position = "dodge") + 
  theme(axis.text.x= element_text(hjust=1)) + ggtitle("No. of 2018 transactions by churn") +
  xlim(201800, 201813)
## Warning: Removed 88229 rows containing non-finite values (stat_count).

ggplot(data = samp1, aes(x = samp1$year)) +
    geom_bar(aes(fill = as.factor(samp1$churn)), position = "dodge") + 
  theme(axis.text.x= element_text(hjust=1)) + ggtitle("No. of yearly transactions by churn")

set.seed(1) #Ensure reproducible code 
#sample 
query2 <- churn_analysis %>%
  select(territory_sid  ,payer_account_id   ,month_id,  product_sid ,sales_revenue, region, sub_region  ,territory_code ,segment,   is_payer_account_bill_in_90_days,   is_payer_account_enterprise ,is_payer_account_domain_free_domain,   is_payer_account_on_enterprise_support, is_payer_account_on_business_support,   is_payer_account_fraud  ,is_payer_account_suspended,    is_payer_account_internal,  is_payer_account_inp_terminated,    is_payer_account_on_behalf_of   ,is_payer_account_tax_exempt,   is_payer_account_reseller,  product_name,   sub_product_name,   service_group_name, product_company_name,   churn_2016, churn_2017, churn_2018, churn
) %>%
  filter(!is.na(churn))  %>%
  mutate(x = random()) %>% collapse() %>%
  filter(x < 0.02)%>%
  mutate(data = if(x < params$query2_frac / 2) 'train' else 'valid')%>%
  select(-x)
samp2 <- collect(query2)

train1 <- filter(samp2, data == 'train')
valid1 <- filter(samp2, data == 'valid')

train1$data <- NULL
valid1$data <- NULL

show_query(query2)
## <SQL>
## SELECT "territory_sid", "payer_account_id", "month_id", "product_sid", "sales_revenue", "region", "sub_region", "territory_code", "segment", "is_payer_account_bill_in_90_days", "is_payer_account_enterprise", "is_payer_account_domain_free_domain", "is_payer_account_on_enterprise_support", "is_payer_account_on_business_support", "is_payer_account_fraud", "is_payer_account_suspended", "is_payer_account_internal", "is_payer_account_inp_terminated", "is_payer_account_on_behalf_of", "is_payer_account_tax_exempt", "is_payer_account_reseller", "product_name", "sub_product_name", "service_group_name", "product_company_name", "churn_2016", "churn_2017", "churn_2018", "churn", "data"
## FROM (SELECT "territory_sid", "payer_account_id", "month_id", "product_sid", "sales_revenue", "region", "sub_region", "territory_code", "segment", "is_payer_account_bill_in_90_days", "is_payer_account_enterprise", "is_payer_account_domain_free_domain", "is_payer_account_on_enterprise_support", "is_payer_account_on_business_support", "is_payer_account_fraud", "is_payer_account_suspended", "is_payer_account_internal", "is_payer_account_inp_terminated", "is_payer_account_on_behalf_of", "is_payer_account_tax_exempt", "is_payer_account_reseller", "product_name", "sub_product_name", "service_group_name", "product_company_name", "churn_2016", "churn_2017", "churn_2018", "churn", "x", CASE WHEN ("x" < 0.1 / 2.0) THEN ('train') WHEN NOT("x" < 0.1 / 2.0) THEN ('valid') END AS "data"
## FROM (SELECT *
## FROM (SELECT "territory_sid", "payer_account_id", "month_id", "product_sid", "sales_revenue", "region", "sub_region", "territory_code", "segment", "is_payer_account_bill_in_90_days", "is_payer_account_enterprise", "is_payer_account_domain_free_domain", "is_payer_account_on_enterprise_support", "is_payer_account_on_business_support", "is_payer_account_fraud", "is_payer_account_suspended", "is_payer_account_internal", "is_payer_account_inp_terminated", "is_payer_account_on_behalf_of", "is_payer_account_tax_exempt", "is_payer_account_reseller", "product_name", "sub_product_name", "service_group_name", "product_company_name", "churn_2016", "churn_2017", "churn_2018", "churn", RANDOM() AS "x"
## FROM (SELECT "territory_sid", "payer_account_id", "month_id", "product_sid", "sales_revenue", "region", "sub_region", "territory_code", "segment", "is_payer_account_bill_in_90_days", "is_payer_account_enterprise", "is_payer_account_domain_free_domain", "is_payer_account_on_enterprise_support", "is_payer_account_on_business_support", "is_payer_account_fraud", "is_payer_account_suspended", "is_payer_account_internal", "is_payer_account_inp_terminated", "is_payer_account_on_behalf_of", "is_payer_account_tax_exempt", "is_payer_account_reseller", "product_name", "sub_product_name", "service_group_name", "product_company_name", "churn_2016", "churn_2017", "churn_2018", "churn"
## FROM "churn_analysis") "gjoxfxyrqb"
## WHERE (NOT((("churn") IS NULL)))) "vquonuamts"
## WHERE ("x" < 0.02)) "mwlgbcinrk") "xhliqgmtcw"
#convert to factor to do random forest classifier

train2 <- train1[,-c(1:5,26:28)]
count(train1, 'region')
## # A tibble: 1 x 2
##   `"region"`      n
##   <chr>       <int>
## 1 region     252160
count(train1, 'sub_region')
## # A tibble: 1 x 2
##   `"sub_region"`      n
##   <chr>           <int>
## 1 sub_region     252160
#convert to factor to do random forest classifier
train2 <- lapply(train2, as.factor)
lapply(train2, class)
## $region
## [1] "factor"
## 
## $sub_region
## [1] "factor"
## 
## $territory_code
## [1] "factor"
## 
## $segment
## [1] "factor"
## 
## $is_payer_account_bill_in_90_days
## [1] "factor"
## 
## $is_payer_account_enterprise
## [1] "factor"
## 
## $is_payer_account_domain_free_domain
## [1] "factor"
## 
## $is_payer_account_on_enterprise_support
## [1] "factor"
## 
## $is_payer_account_on_business_support
## [1] "factor"
## 
## $is_payer_account_fraud
## [1] "factor"
## 
## $is_payer_account_suspended
## [1] "factor"
## 
## $is_payer_account_internal
## [1] "factor"
## 
## $is_payer_account_inp_terminated
## [1] "factor"
## 
## $is_payer_account_on_behalf_of
## [1] "factor"
## 
## $is_payer_account_tax_exempt
## [1] "factor"
## 
## $is_payer_account_reseller
## [1] "factor"
## 
## $product_name
## [1] "factor"
## 
## $sub_product_name
## [1] "factor"
## 
## $service_group_name
## [1] "factor"
## 
## $product_company_name
## [1] "factor"
## 
## $churn
## [1] "factor"
#convert to factor to do random forest classifier
train2$churn <- as.character(train2$churn)
train2$churn <- as.factor(train2$churn)
#Boston.rf=randomForest(train2$churn ~ . , data = train2)
#Boston.rf

dbListTables(con) dbWriteTable(con, “mtcars”, mtcars) dbListTables(con)

dbListFields(con, “mtcars”) dbReadTable(con, “mtcars”)

You can fetch all results:

res <- dbSendQuery(con, “SELECT * FROM mtcars WHERE cyl = 4”) dbFetch(res) dbClearResult(res)

Or a chunk at a time

res <- dbSendQuery(con, “SELECT * FROM mtcars WHERE cyl = 4”) while(!dbHasCompleted(res)){ chunk <- dbFetch(res, n = 5) print(nrow(chunk)) } # Clear the result dbClearResult(res)

Disconnect from the database

dbDisconnect(con)