Data Science Pipelines with R and Teradata Aster

Gregory Kanevsky
October 30, 2017

What is Data Science Pipeline?

(not this)

Not Data Science Pipeline.

Source: Red Hat Developer Program

What is Data Science Pipeline?

(could be this)

Data Science Pipeline Workflow.

Source: Tesla Institute

How Did Data Science Pipelines Come About?

  • 4 V's (attributes of Big Data):
    • Volume: Scale of Data
    • Velocity: Speed of Data
    • Variety: Diversity of Data
    • Veracity: (Un)certainty of Data
  • The fifth V: Extracting Business Value from Big Data
  • Value shifts focus from storing, processing, managing, etc.

How to Deliver Business Value with Big Data

  • By shifting focus from storing, processing and managing big data
  • to agile enterprise applications encompassing big data
  • By addressing business use cases…
  • By leveraging scalable platform and advanced analytics…
  • By building
    • reproducable,
    • consistent,
    • productionizable applications…
  • …as Data Science Pipelines.

Components of Data Science Pipeline

  • Connectivity and streaming (e.g. Teradata QueryGrid, connectors)
  • High volume data processing and storage (e.g. Teradata Aster on various platforms: appliance, Hadoop, cloud, software only)
  • Co-located analytical engines: statistics, machine learning, text, graphs, etc. (e.g. Aster SQL-MR, SQL-GR, in-db R)
  • Visual and reporting for consumption of results (e.g. R packages, App Center)
  • Programming environment that supports logging, alerting, monitoring, testing, distributed execution, deployment, libraries and access to the components above (e.g. R, Python, Java)

How To Build It?

with Cross of Different Skills:

  • Data Science
  • Programming / Software Development
  • Enterprise / Database Design and Architecture
  • Business / Domain Knowledge

and with R and Aster:

  • R programming environment (desktop or server)
  • Teradata Aster Big Data Database
  • Aster R Integration with TeradataAsterR
  • Other R packages (toaster, ggplot2, igraph, etc.)

Teradata Aster Architecture

Teradata Aster Architecture.

Teradata Aster R Primer Environment

  • R programming environment and TeradataAsterR: glue that holds everything together
  • Aster (any platform): distributed data, scale, performance, and ultimate analytics with R runtime enabled
  • Toy dataset: Lahman Baseball Database (Lahman package)
  • Countering with dplyr examples in R for comparison
library(Lahman)
library(dplyr)
library(TeradataAsterR)
library(ggplot2)
library(ggthemes)

ta.connect(dsn=dsn, database=database)
RODBC Connection 1
Details:
  case=nochange
  DSN=PreSalesCluster1-dallas
  DATABASE=baseball

Use Case I: Analyzing Batters and Batting Stats

  • Transforms and filters
  • Summarization
  • Calculating complex metrics
  • Exploratory analysis
  • Calculating Sabermetric stats
  • Dimensionality reduction

How Aster R Represents Data

  • Using virtual objects
  • The look and feel is similar to R objects
  • Virtual data frame represents data in Aster (table, view, query) and is similar to R data frame
  • There other virtual objects similar to R factor and vector
batting.ta = ta.data.frame("batting", schemaName = "public")

ta.dim(batting.ta)
ta.nrow(batting.ta)
ta.head(batting.ta)
ta.summary(batting.ta)
ta.colnames(batting.ta) <- c(newnames)

Loading Data into Aster

  • Aster Virtual Data Frame
  • Using utility ncluster_loader: scalable and enterprise friendly version suitable for high-performance batch loads
  • Using Aster R virtual data frame:
    • ta.push into existing virtual data frame
    • ta.load.csv loads data from a file in .csv format
    • ta.create creates tables in Aster using R or virtual data frame
    • ta.reify materializes virtual data frame into temporary schema

Loading Baseball Data into Aster

createBaseballData <- function(data, table, schema, 
                               partitionKey, analyzeFlag) {
  t = proc.time()
  ta.dropTable(tableName = table, schemaName = schema)
  ta.create(data, table = table, schemaName = schema, 
            tableType = "fact",
            partitionKey = partitionKey, 
            row.names = TRUE, analyze = analyzeFlag)
  runtime = proc.time() - t

}

# Master table
createBaseballData(Master, "master", "public", "playerID", TRUE)

# Batting table
createBaseballData(Batting, "batting", "public", "playerID", TRUE)

Getting Started with Lahman DatabaseS

# R and Lahman package
dim(Batting); dim(Master)
[1] 102816     22
[1] 19105    26
# Aster R
batting.ta = ta.data.frame("batting")
master.ta = ta.data.frame("master")

ta.dim(batting.ta); ta.dim(master.ta)
[1] 101332     23
[1] 18846    27

Pipeline and TDD: Always Test Every Step

  • Example: validating data load
compareDataInRandAster <- function(df, ta, key, columns=names(df)) {

  columns = setdiff(unique(c(key, columns)), 
                    c("lgID","teamID","bats","throws"))
  data_in_R = df[, columns] %>% arrange_(.dots=setNames(key,key))
  data_from_Aster = as.data.frame(ta[ , columns], 
    stringsAsFactors = FALSE) %>% arrange_(.dots=setNames(key,key))

  all.equal(data_in_R, data_from_Aster)
}

compareDataInRandAster(Master, master.ta, key = c("playerID"))
[1] TRUE
compareDataInRandAster(Batting, batting.ta, key = c("playerID", 
                                  "yearID", "stint"))
[1] TRUE

Types of Joins Supported by Aster R

  • Inner join
  • Left and right outer joins
  • Full outer join
  • Semi-join
  • Anti-join (opposite of semi-join)
  • Cross join (Cartesian product)
  • Data is always processed in Aster and never passes through network unless explicitly asked for

Join in R and Aster

# R
big.df = merge(Batting, Master, by="playerID")

# dplyr
big.tbl = Batting %>%
  inner_join(Master, by="playerID")

# Aster R
big.ta = ta.join(batting.ta, master.ta, by="playerID")
ta.colnames(big.ta)[c('x.row_names','x.playerID')] = c('row_names','playerID')

compareDataInRandAster(big.df, big.tbl, 
  key = c("playerID", "yearID", "stint"))
[1] TRUE
compareDataInRandAster(big.tbl, big.ta, 
  key = c("playerID", "yearID", "stint"))
[1] TRUE

Data Manipulation: Subset and Transform

batting_metrics = c("G", "AB", "R", "H", "HR", "BB", "SO")

# dplyr
big.prepped.df = big.df %>% 
  mutate(key = paste(playerID, yearID, stint, sep="-"),
         age = yearID - birthYear) %>% 
  select_(quote(c(key, playerID, yearID, stint, age)), 
          .dots=batting_metrics) %>%
  filter(!is.na(HR) & yearID >= 1995)

# Aster R
big.prepped.ta = ta.transform(big.ta, 
    key = paste(playerID, yearID, stint, sep="-"),
    age = yearID - birthYear)
big.prepped.ta = ta.subset(big.prepped.ta[,c("key","playerID","yearID","stint","age",
    batting_metrics, "row_names")],
    !is.na(HR) & yearID >= 1995)

compareDataInRandAster(big.prepped.df, big.prepped.ta, key = "key")
[1] TRUE

Summarization

Summarization with Aster R may utilize SQL, SQL-MR or R execution engines
always running in-database:

  • SQL GROUP BY compatible operations, e.g. MAX, MIN, AVG, etc.: ta.summarise or ta.summarise.each
  • SQL window functions using ta.transform
  • in-database R with ta.tapply or ta.by for most flexible option

SQL GROUP BY with ta.summarise

Aggregate stats for each year over players' stints:

# dplyr
summary1.df = big.prepped.df %>% group_by(playerID, yearID, age) %>%
  summarise_each_(funs(sum), batting_metrics) 

# Aster R
summary1.ta = ta.summarise(big.prepped.ta, group.by = c("playerID", "yearID", "age"),
  G=sum(G), AB=sum(AB), R=sum(R), H=sum(H), HR=sum(HR), BB=sum(BB), SO=sum(SO))

#summary1.ta = ta.summarise.each(tempforsummarise.ta, group.by = c("playerID"),
#                                funs = c("sum"), columns = batting_metrics)

compareDataInRandAster(summary1.df, summary1.ta, key = c("playerID","yearID"))
[1] TRUE

Window Functions with ta.transform

# dplyr
window.df = summary1.df %>% group_by(playerID) %>%
  mutate(seasonsTotal=n(),
    currentSeason=with_order(order_by = yearID, fun = row_number, x = yearID),
    currentTopHR=with_order(order_by = yearID, fun = cummax, x = HR),
    topHR=max(HR), startCareerYear = min(yearID), endCareerYear = max(yearID))

# Aster R
window.ta = ta.transform(summary1.ta, 
  seasonsTotal=n(partition = playerID),
  currentSeason=row_number(order = yearID, partition = playerID),
  currentTopHR=cummax(HR, order = yearID, partition = playerID),
  topHR=max(HR, partition = playerID),
  startCareerYear = min(yearID, partition=playerID), 
  endCareerYear = max(yearID, partition=playerID))

compareDataInRandAster(window.df, window.ta,
  key = c("playerID", "yearID"))
[1] TRUE

In-Database R aggregates with ta.tapply

What is HR career age?

# dplyr
summary2.df = window.df %>%
  group_by(playerID) %>%
  summarize(
    seasons_total = seasonsTotal[[1]],
    max_hr = max(HR),
    top_hr_age = age[HR == max(HR)][[1]])

# Aster R
summary2.ta = ta.tapply(window.ta, INDEX=window.ta$playerID, 
  FUN=function(x){c(x$seasonsTotal[[1]], max(x$HR), 
                    x$age[x$HR == max(x$HR)][[1]])},
  out.tadf = list(columns=c("playerID", "seasons_total",
                            "max_hr", "top_hr_age")))

compareDataInRandAster(summary2.df, summary2.ta, key = "playerID")
[1] TRUE

HR Career Age Distributions

plot of chunk unnamed-chunk-11plot of chunk unnamed-chunk-11plot of chunk unnamed-chunk-11

ggplot(as.data.frame(
  aa.hist(data=ta.subset(summary2.ta, seasons_total >= 5 & max_hr >= 10),
          value.column="top_hr_age", bin.size=1, 
          start.value=18, end.value=45)[[1]])) +
  geom_bar(aes(start_bin, frequency), stat='identity') +
  labs(title='Players with Total Seasons >= 5 and max. HR >= 10',
       x='Age', y='Count')

Dimensionality Reduction with PCA

  • Including data science process and models is natural
  • Become integral part of the workflow
  • Same principles of reproducability, consistency, operationability apply
  • Examples with PCA and Logistic Regression

Sabermetric Batting Statistics

batting.new=ta.transform(batting.ta,
                         key=paste(playerID,yearID, sep='-'))
batting.new=ta.summarise(batting.new, group.by = c("key"),
  playerID=min(playerID), yearID=min(yearID), G=sum(G), AB=sum(AB), 
  R=sum(R), H=sum(H), X2B=sum(X2B), X3B=sum(X3B), HR=sum(HR), 
  RBI=sum(RBI), SB=sum(SB), CS=sum(CS), BB=sum(BB), SO=sum(SO), 
  IBB=sum(IBB), HBP=sum(HBP), SH=sum(SH), SF=sum(SF), GIDP=sum(GIDP))
batting.new=ta.transform(ta.subset(batting.new, AB>30 & yearID > 1995), 
  TB = H + X2B + 2*(X3B) + 3*(HR), BA = H / AB,
  OBP = (H + BB + HBP) / (AB + BB + HBP + SF),
  SLG = (H + X2B + 2*(X3B) + 3*(HR)) / AB)
batting.new=ta.transform(batting.new,
  OPS = OBP + SLG, TA = (TB + BB + HBP + SB − CS)/(AB − H + CS + GIDP),
  ISO = SLG − BA, SECA = (TB − H + BB + SB − CS) / AB,
  RC=(H+BB+HBP−CS−GIDP)*(TB+0.26*(BB−IBB+HBP)+0.52*(SH+SF+SB)) /
    (AB + BB + HBP + SH + SF))
batting.new=ta.transform(batting.new, 
  RC27=RC/((AB−H+SH+SF+CS+GIDP)/27))

Materializing Virtual Data Frame in Aster

ta.tableType(batting.new)
[1] NA
ta.dropTable("batting_for_pca", schemaName = "public")
[1] -2
ta.create(ta.subset(batting.new, H >= 1), "batting_for_pca", 
          schemaName = "public", tableType = "fact", 
          partitionKey = "playerID", row.names = TRUE)
batting.pca = ta.data.frame("batting_for_pca")
ta.tableType(batting.pca)
  schemaname       tablename  tabletype partitionkey
1     public batting_for_pca fact           playerID
ta.dim(batting.pca)
[1] 11796    31

Running PCA (prcomp and Aster SQL-MR)

data_in_aster_for_pca = batting.pca[,c("TB","OBP","SLG","OPS","ISO","RC","RC27")]

# PCA In-Database R 
pca.result = ta.apply(data_in_aster_for_pca, MARGIN = c(), 
  FUN = function(x){prcomp(x, retx=TRUE, center=TRUE, scale.=TRUE)})
class(pca.result) = "prcomp"

# PCA in R
pca.result.R = prcomp(as.data.frame(data_in_aster_for_pca), 
  retx=TRUE, center=TRUE, scale.=TRUE)

# PCA in Aster (SQL-MR)
ttt = aa.scale.map(data=data_in_aster_for_pca,
  input.columns=c("TB","OBP","SLG","OPS","ISO","RC","RC27")) 
scale.result.tr = aa.scale(data=data_in_aster_for_pca, 
                           object=ttt, method="std")[[1]]
pca.result.Aster = as.data.frame(aa.pca(scale.result.tr)[[1]])

Comparing PCA Results

plot of chunk unnamed-chunk-16plot of chunk unnamed-chunk-16plot of chunk unnamed-chunk-16

Use Case II: Predicting Player Position

Example of complete workflow including:

  • Data load/sourcing
  • Data transformations/filters/summarization
  • Join
  • Random sampling and partitioning
  • Predictive model (Log. Regression)
  • Model evalution (ROC)

Fielding: Data Load and Transformations

createBaseballData(Fielding, "fielding", "public", "playerID", TRUE)

fielding = ta.data.frame("fielding")

fielding.tadf = ta.subset(fielding, yearID >= 1990 &
  lgID == 'NL' & G >= 20)[, c("playerID","yearID","POS")]

fielding.tadf = ta.unique(fielding.tadf)

fielding.tadf = ta.transform(fielding.tadf, 
  ispitcher = if(POS=="P") 1 else 0)

Batting: New Stats and Data Transformations

batting = ta.data.frame("batting")

batting.summary.tadf = ta.summarise(batting, group.by = c("playerID", "yearID", "lgID"),
  G=sum(G), AB=sum(AB), R=sum(R), H=sum(H), X2B=sum(X2B), X3B=sum(X3B), HR=sum(HR), BB=sum(BB), 
  SO=sum(SO), HBP=sum(HBP), SB=sum(SB), CS=sum(CS), GIDP=sum(GIDP), SF=sum(SF))
batting.tadf = ta.subset(batting.summary.tadf, yearID >= 1990 & lgID == 'NL' & G >= 20 & AB > H)

batting.enh.tadf = ta.transform(batting.tadf, 
  BA=H/AB, SLG=(H+X2B+2*X3B+3*HR)/AB,
  TA=(H+X2B+2*X3B+3*HR+BB+HBP+SB-CS)/(AB-H+CS+GIDP),
  OBP=(H+BB+HBP)/(AB+BB+HBP+SF),
  OPS=(AB*(H+BB+HBP)+(H+X2B+2*X3B+3*HR)*(AB+BB+SF+HBP))/
    (AB*(AB+BB+SF+HBP)))

Consolidating Data Sets

all.tadf = ta.join(fielding.tadf, batting.enh.tadf, by=c("playerID","yearID"))
all.tadf = ta.transform(all.tadf, id = paste(x.playerID, x.yearID, sep="-"), playerID=x.playerID)

ta.dropTable("my_simplemodel_data", schemaName = "public")
[1] -2
ta.create(all.tadf, "my_simplemodel_data", schemaName = "public", tableType = "fact", partitionKey = "playerID",
          row.names = TRUE)
all.tadf = ta.data.frame("my_simplemodel_data", schemaName = "public")
ta.dim(all.tadf)
[1] 13280    29

Splitting Data into Training and Testing Datasets for Machine Learning

This is most simplistic approach to partition data:

data_size = ta.dim(all.tadf)[[1]]
mlsets = aa.random.sample(all.tadf, num.sample = c(data_size*.8,
                                                   data_size*.2))[[1]]

ta.table(mlsets$set_id, mlsets$ispitcher)
  set_id ispitcher Freq
1      0         0 7710
2      0         1 2914
3      1         0 1933
4      1         1  723
train.tadf = ta.subset(mlsets, set_id == 0)
test.tadf = ta.subset(mlsets, set_id == 1)

GLM Model (Linear Regression)

pitcher.glm.model  = aa.glm(
  formula=(ispitcher ~ BA + SLG + TA),
  family='binomial',
  data=train.tadf,
  maxit = 50)
pitcher.glm.model$coefficients
  attribute   predictor category    estimate     std_err   z_score     p_value significance   family
1         3          TA       NA    -1.83907 3.57527e-01  -5.14386 2.69152e-07          *** BINOMIAL
2         0 (Intercept)       NA     3.65135 1.08783e-01  33.56540 0.00000e+00          *** BINOMIAL
3        -1      Loglik       NA -2865.56006 1.06240e+04   3.00000 0.00000e+00         <NA> BINOMIAL
4         2         SLG       NA   -19.22090 9.46370e-01 -20.31020 0.00000e+00          *** BINOMIAL
5         1          BA       NA     9.02703 1.10329e+00   8.18189 4.44089e-16          *** BINOMIAL

Testing GLM Model

glm.result = aa.glm.predict(object=pitcher.glm.model,  
                            newdata = test.tadf,
                            terms = c("id", "ispitcher"))
ta.head(glm.result[[1]])
              id ispitcher fitted_value
1 guerrwi01-1999         0   0.06976775
2 alicelu01-1994         0   0.01541507
3 mitchke02-1991         0   0.05801503
4 torreca01-2014         1   0.97470061
5 mackoro01-2002         0   0.02407595
6   bayja01-2012         0   0.18190595

Model Evaluation with ROC

glm.result.df = as.data.frame(glm.result$result) 
roc = data.frame(fpos = numeric(0), tpos = numeric(0))
for(threshold in seq(0., 1., 0.01)) {
  cm = t(table(c(glm.result.df$fitted_value > threshold, 
                 TRUE, FALSE),
               c(glm.result.df$ispitcher, 1, 0)))
  roc = rbind(roc, c(fpos=cm[1,2]/sum(cm[1,]), 
                     tpos=cm[2,2]/sum(cm[2,])))
}

names(roc) = c('fpos','tpos')

ROC and AUC

plot of chunk unnamed-chunk-25

References