Data Science Pipeline with Teradata Aster R

Gregory Kanevsky
March 15, 2017

What is Scalable Data Science?

  • Being able to efficiently manage the data volumes and data sources that are found within large organizations while delivering value in real business processes
  • Ability to construct, maintain, and evolve complex data pipelines and decision processes at scale in dynamic environment
  • Ability to iterate, fail fast, and create interpretable and actionable insights

Teradata Aster R (Aster R)

  • Packages TeradataAsterR (supported by Teradata) and toaster (open source on CRAN)
  • Integrate the strengths of Aster SQL and Analytics with R programming environment
  • A copmlete tool set within R
  • Analytics on all your data
  • Data managed in the Aster environment
  • Leverage the MPP and distributed architecture
  • Scalable analytics with prebuilt functions and organic R interface
  • Both client-server and in-database processing frameworks

Aster R Programming

  • Data description, summarization, profiling, and exploratory analysis
  • Data transformation including cleansing, filtering, mapping, and moving
  • Feature selection and inferential analisys
  • Classification and predictive systems
  • Building reproducible, consistent, and maintainable data pipelines

R Driven Data Pipelines with Aster

  1. R Interface
  2. Execution in Aster database
    • SQL
    • SQL-MR
    • SQL-GR
    • in-database R
  3. Flow control and logic in R with intermediate results in both R (client) and Aster database (server)
  4. Results in reproducible, consistent and productionizable data pipelines, visualizations and processes

Aster R Connection and Execution Environment

# Aster Package
library(TeradataAsterR)

AsterR = ta.connect("PreSalesCluster1-dallas" )

options(digits=3)

options(warn=-1) # Eliminates unnecessary warnings about row ordering

options(real.time.nrow=TRUE) #Takes longer to create tadf, but faster analysis later

options(print.sqlmr.query=FALSE)

SQL Execution

# Most SQL, SQL-MR, or SQL-Graph functions can be executed using ta.Query()
sql = "SELECT * FROM batting ORDER BY yearid DESC LIMIT 2"
ta.Query(sql)
   playerid yearid stint teamid lgid   g  ab  r   h x2b x3b hr rbi sb cs bb so ibb hbp sh sf gidp
1 aybarer01   2015     1    LAA   AL 156 597 74 161  30   1  3  44 15  6 25 73   1   4  7  5   12
2 ackledu01   2015     2    NYA   AL  23  52  6  15   3   2  4  11  0  0  4  7   0   0  0  1    0
# or with RODBC 
sqlQuery(channel = AsterR, query = sql)
   playerid yearid stint teamid lgid   g  ab  r   h x2b x3b hr rbi sb cs bb  so ibb hbp sh sf gidp
1 arenano01   2015     1    COL   NL 157 616 97 177  43   4 42 130  2  5 34 110  13   4  0 11   17
2 anderbr04   2015     1    LAN   NL  31  47  1   4   1   0  0   3  0  0  5  32   0   0  9  0    2

Understand Aster R Environment

# What is the Version of R Installed in the Aster database?
ta.R.Version()['version.string']
$version.string
[1] "R version 3.1.3 (2015-03-09)"
# What is local R version?
R.Version()['version.string']
$version.string
[1] "R version 3.2.5 (2016-04-14)"
# What packages are installed in the Aster database?
dimnames( ta.installed.packages(lib.loc = NULL, priority = NULL))[[1]]
  [1] "acepack"       "Amelia"        "assertthat"    "base"          "base64enc"     "BH"            "bitops"        "boot"          "broom"         "caTools"       "chron"         "class"         "cluster"       "coda"          "codetools"     "colorspace"    "compiler"      "datasets"      "DBI"           "dichromat"     "digest"        "doParallel"    "dplyr"         "e1071"         "foreach"       "forecast"      "foreign"       "Formula"       "fracdiff"      "geepack"       "geohash"       "ggplot2"       "glmnet"        "graphics"      "grDevices"     "grid"          "gridBase"      "gridExtra"     "gsubfn"        "gtable"        "Hmisc"         "htmltools"     "htmlwidgets"   "httpuv"        "igraph"        "ipred"         "irlba"         "iterators"     "jsonlite"      "KernSmooth"    "labeling"      "lattice"       "latticeExtra"  "lava"          "lazyeval"      "lda"           "lmtest"        "lpSolve"       "lpSolveAPI"    "lubridate"     "magrittr"      "markdown"      "MASS"          "MatchIt"       "Matrix"        "MatrixModels"  "maxent"        "maxLik"        "MCMCpack"      "methods"       "mgcv"          "mime"          "minqa"         "miscTools"    
 [75] "mnormt"        "multcomp"      "munsell"       "mvtnorm"       "nlme"          "NLP"           "NMF"           "nnet"          "numDeriv"      "odfWeave"      "openNLP"       "openNLPdata"   "parallel"      "pkgmaker"      "plyr"          "polspline"     "prodlim"       "proto"         "psych"         "quadprog"      "quantreg"      "R6"            "randomForest"  "raster"        "RColorBrewer"  "Rcpp"          "RcppArmadillo" "RcppEigen"     "registry"      "reshape2"      "rJava"         "Rlof"          "rms"           "rngtools"      "rpart"         "RSQLite"       "RTextTools"    "sandwich"      "scales"        "shiny"         "slam"          "sp"            "SparseM"       "spatial"       "splines"       "sqldf"         "stats"         "stats4"        "stringi"       "stringr"       "survival"      "syuzhet"       "tau"           "tcltk"         "TH.data"       "tidyr"         "timeDate"      "tm"            "tools"         "tree"          "tseries"       "utils"         "VGAM"          "XML"           "xtable"        "yaml"          "zoo"          

Aster and Aster R Objects

# List R Objects 
batting = ta.data.frame("batting_enh", schemaName = "public")
fielding = ta.data.frame("fielding")
ta.ls()
[1] "batting"  "fielding"
# and Aster Objects
ta.dbObjects.ls(schemaName="public", type="table")[1:16]
 [1] "app_center_visualizations_33" "app_center_visualizations_47" "app_center_visualizations_57" "app_center_visualizations_8"  "appearances"                  "batting"                      "batting_enh"                  "boston"                       "cel_cdl_location"             "cfilter_test"                 "cfilter_test2"                "cfilter_test3"                "clustermembership"            "collegeplaying"               "count_by_quarter"             "count_day_wk"                

Example: Predict Pitcher by Player's Batting Stats

  • Purely made-up problem on demo dataset - Lahman Baseball database
  • Build data pipeline that prepares and transforms data and then builds and evaluates model
  • Tables: fielding and batting
  • Predict position: P or not P (fielding)
  • Using batting stats only (batting) (NL players only)

Pipeline: Subsetting Fielding Stats

# Fielding info
fielding.tadf = ta.subset(fielding, yearid >= 1990 & 
                  lgid == 'NL' & g >= 20)[, c("playerid","yearid","teamid","pos")]
ta.show(fielding.tadf, 2)
   playerid yearid teamid pos
1 cartejo01   1990    SDN  CF
2 cartejo01   1990    SDN  LF
ta.table(fielding.tadf$pos)
   pos Freq
1   1B  805
2   2B  884
3   3B  889
4    C  956
5   CF  870
6   LF 1115
7   OF 2584
8    P 4804
9   RF  931
10  SS  814

Pipeline: Transforming (Calculating New Attribute)

fielding.tadf = ta.transform(fielding.tadf, 
                    ispitcher = if(pos=="P") 1 else 0)
ta.coltypes(fielding.tadf)
 playerid    yearid    teamid       pos ispitcher 
"varchar"     "int" "varchar" "varchar" "numeric" 
ta.show(fielding.tadf, 7)
   playerid yearid teamid pos ispitcher
1 averyst01   1990    ATL   P         1
2  boydoi01   1990    MON   P         1
3 clarkda05   1990    CHN  LF         0
4 clarkda05   1990    CHN  OF         0
5 larkiba01   1990    CIN  SS         0
6 mohorda01   1990    MON   P         1
7 musseje01   1990    NYN   P         1

Pipeline: Subsetting Batting Stats

# Batting info
batting.tadf = ta.subset(batting, 
                      yearid >= 1990 & lgid == 'NL' & g >= 20)[,
  c("playerid","yearid","teamid","ba","slg","ta","obp","ops")]
ta.show(batting.tadf, 7)
   playerid yearid teamid    ba   slg    ta   obp   ops
1 cormirh01   2001    PHI 0.000 0.000 0.000 0.000 0.000
2 bonifem01   2012    MIA 0.258 0.316 0.695 0.330 0.645
3 larocad01   2007    PIT 0.272 0.458 0.753 0.345 0.803
4 jonesbo03   1997    NYN 0.129 0.161 0.241 0.169 0.331
5 villaca01   2014    CHN 0.000 0.000 0.000 0.000 0.000
6 edmonji01   2008    SDN 0.178 0.233 0.434 0.265 0.498
7 scutama01   2002    NYN 0.222 0.361 0.400 0.216 0.577

Pipeline: Joining

all.tadf = ta.join(fielding.tadf, batting.tadf, by=c("playerid","yearid","teamid"))[,1:10]
ta.colnames(all.tadf) = c("pos","ispitcher","ba","slg",
                          "ta","obp","ops","playerid","yearid","teamid")
all.tadf = ta.transform(all.tadf, id = paste(playerid, yearid, teamid, sep="-"))
ta.dim(all.tadf)
[1] 13491    11
ta.show(all.tadf, 3)
  pos ispitcher     ba    slg     ta    obp   ops  playerid yearid teamid                 id
1   P         1 0.1333 0.1333 0.1481 0.1333 0.267 averyst01   1990    ATL averyst01-1990-ATL
2   P         1 0.0508 0.0508 0.0526 0.0508 0.102  boydoi01   1990    MON  boydoi01-1990-MON
3  LF         0 0.2422 0.3168 0.6984 0.3850 0.702 aldremi01   1990    MON aldremi01-1990-MON

Pipeline: Materialize Results

ta.dropTable("testdata", schemaName = "public")
[1] -2
ta.create(all.tadf, "testdata", schemaName = "public", tableType = "fact", partitionKey = "playerid",
          row.names = TRUE)

all.tadf = ta.data.frame("testdata", schemaName = "public")
ta.dim(all.tadf)
[1] 13491    12
ta.table(all.tadf$ispitcher)
  ispitcher Freq
1         0 9848
2         1 3643

Pipeline: Training vs. Testing

  • Alternatively, consider:
    • ta.sample
    • aa.random.sample
train.tadf = ta.subset(all.tadf, row_names %% 10 <= 8)
ta.dim(train.tadf)
[1] 12142    12
test.tadf = ta.subset(all.tadf, row_names %% 10 > 8)
ta.dim(test.tadf)
[1] 1349   12

Pipeline: Logistic Regression Model

# Create GLM model
pitcher.glm.model  = ta.glm(
  formula=(ispitcher ~ ba + slg + ta),
  family=binomial(),
  data=train.tadf)

pitcher.glm.model$coefficients
  attribute   predictor category estimate  std_err z_score  p_value significance
1         3          ta       NA    -1.34 3.33e-01   -4.01 5.99e-05          ***
2         0 (Intercept)       NA     3.63 1.01e-01   36.06 0.00e+00          ***
3        -1      Loglik       NA -3178.45 1.21e+04    3.00 0.00e+00         <NA>
4         2         slg       NA   -21.34 8.81e-01  -24.22 0.00e+00          ***
5         1          ba       NA    10.71 1.04e+00   10.29 0.00e+00          ***

Pipeline: Testing Predictive Model

glm.result = ta.glm.predict(object=pitcher.glm.model, 
                            newdata = test.tadf,
                            terms = c("id", "ispitcher"))

head(glm.result$predicted)
                  id ispitcher fitted_value
1 howarry01-2014-PHI         0       0.0504
2 hattesc01-2006-CIN         0       0.0249
3  krukjo01-1990-PHI         0       0.0287
4 velezeu01-2009-SFN         0       0.0528
5 wallati01-1996-LAN         0       0.1527
6 rolensc01-2005-SLN         0       0.0530

plot of chunk ROC Curve

Visualization: ROC Curve (source code)

roc = data.frame(fpos = numeric(0), tpos = numeric(0))
for(threshold in seq(0., 1., 0.01)) {
  cm = t(table(c(glm.result$predicted$fitted_value > threshold, TRUE, FALSE),
               c(glm.result$predicted$ispitcher, 1, 0)))
  roc = rbind(roc, c(fpos=cm[1,2]/sum(cm[1,]), tpos=cm[2,2]/sum(cm[2,])))
}

names(roc) = c('fpos','tpos')
ggplot2::ggplot(roc) +
  ggplot2::geom_line(ggplot2::aes(fpos, tpos)) +
  ggplot2::geom_abline(intercept = 0, slope = 1, linetype="dashed") +
  ggplot2::labs(subtitle="ROC Curve", title="Predicting Pitcher by Batting Stats", 
                x="False Positive Rate", y="True Positive Rate") +
  ggthemes::theme_tufte(ticks = FALSE, base_size = 20) +
  ggplot2::xlim(0,1) + ggplot2::ylim(0,1)