ls()
## character(0)
rm(list=ls())
gc()
##          used (Mb) gc trigger (Mb) max used (Mb)
## Ncells 471538 25.2     940480 50.3   713531 38.2
## Vcells 845104  6.5    1650153 12.6  1023636  7.9
getwd()
## [1] "C:/Users/KOGENTIX/Documents/R"
setwd("C:/Users/KOGENTIX/Documents")
#install.packages("sparklyr")
library(sparklyr)
sc <- spark_connect(master = "local")

#install.packages(c("nycflights13", "Lahman"))
library(dplyr)
## 
## Attaching package: 'dplyr'
## The following objects are masked from 'package:stats':
## 
##     filter, lag
## The following objects are masked from 'package:base':
## 
##     intersect, setdiff, setequal, union
iris_tbl <- copy_to(sc, iris,overwrite = TRUE)
flights_tbl <- copy_to(sc, nycflights13::flights, "flights",overwrite = TRUE)
batting_tbl <- copy_to(sc, Lahman::Batting, "batting",overwrite = TRUE)
src_tbls(sc)
## [1] "batting" "flights" "iris"
# filter by departure delay and print the first few records
flights_tbl %>% filter(dep_delay == 2)
## # Source:   lazy query [?? x 19]
## # Database: spark_connection
##     year month   day dep_time sched_dep_time dep_delay arr_time
##    <int> <int> <int>    <int>          <int>     <dbl>    <int>
##  1  2013     1     1      517            515         2      830
##  2  2013     1     1      542            540         2      923
##  3  2013     1     1      702            700         2     1058
##  4  2013     1     1      715            713         2      911
##  5  2013     1     1      752            750         2     1025
##  6  2013     1     1      917            915         2     1206
##  7  2013     1     1      932            930         2     1219
##  8  2013     1     1     1028           1026         2     1350
##  9  2013     1     1     1042           1040         2     1325
## 10  2013     1     1     1231           1229         2     1523
## # ... with more rows, and 12 more variables: sched_arr_time <int>,
## #   arr_delay <dbl>, carrier <chr>, flight <int>, tailnum <chr>,
## #   origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>,
## #   minute <dbl>, time_hour <dbl>
library(DBI)
iris_preview <- dbGetQuery(sc, "SELECT * FROM iris LIMIT 10")
iris_preview
##    Sepal_Length Sepal_Width Petal_Length Petal_Width Species
## 1           5.1         3.5          1.4         0.2  setosa
## 2           4.9         3.0          1.4         0.2  setosa
## 3           4.7         3.2          1.3         0.2  setosa
## 4           4.6         3.1          1.5         0.2  setosa
## 5           5.0         3.6          1.4         0.2  setosa
## 6           5.4         3.9          1.7         0.4  setosa
## 7           4.6         3.4          1.4         0.3  setosa
## 8           5.0         3.4          1.5         0.2  setosa
## 9           4.4         2.9          1.4         0.2  setosa
## 10          4.9         3.1          1.5         0.1  setosa
# copy mtcars into spark
mtcars_tbl <- copy_to(sc, mtcars,overwrite = TRUE)

# transform our data set, and then partition into 'training', 'test'
partitions <- mtcars_tbl %>%
  filter(hp >= 100) %>%
  mutate(cyl8 = cyl == 8) %>%
  sdf_partition(training = 0.5, test = 0.5, seed = 1099)



# fit a linear model to the training dataset
fit <- partitions$training %>%
  ml_linear_regression(response = "mpg", features = c("wt", "cyl"))
## * No rows dropped by 'na.omit' call
fit
## Call: ml_linear_regression(., response = "mpg", features = c("wt", "cyl"))
## 
## Coefficients:
## (Intercept)          wt         cyl 
##   33.499452   -2.818463   -0.923187
summary(fit)
## Call: ml_linear_regression(., response = "mpg", features = c("wt", "cyl"))
## 
## Deviance Residuals::
##    Min     1Q Median     3Q    Max 
## -1.752 -1.134 -0.499  1.296  2.282 
## 
## Coefficients:
##             Estimate Std. Error t value  Pr(>|t|)    
## (Intercept) 33.49945    3.62256  9.2475 0.0002485 ***
## wt          -2.81846    0.96619 -2.9171 0.0331257 *  
## cyl         -0.92319    0.54639 -1.6896 0.1518998    
## ---
## Signif. codes:  0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1
## 
## R-Squared: 0.8274
## Root Mean Squared Error: 1.422
spark_apply(iris_tbl, function(data) {
  data[1:4] + rgamma(1,2)
})
## # Source:   table<sparklyr_tmp_263822501050> [?? x 4]
## # Database: spark_connection
##    Sepal_Length Sepal_Width Petal_Length Petal_Width
##           <dbl>       <dbl>        <dbl>       <dbl>
##  1     10.86204    9.262043     7.162043    5.962043
##  2     10.66204    8.762043     7.162043    5.962043
##  3     10.46204    8.962043     7.062043    5.962043
##  4     10.36204    8.862043     7.262043    5.962043
##  5     10.76204    9.362043     7.162043    5.962043
##  6     11.16204    9.662043     7.462043    6.162043
##  7     10.36204    9.162043     7.162043    6.062043
##  8     10.76204    9.162043     7.262043    5.962043
##  9     10.16204    8.662043     7.162043    5.962043
## 10     10.66204    8.862043     7.262043    5.862043
## # ... with more rows
kmeans_model <- iris_tbl %>%
  select(Petal_Width, Petal_Length) %>%
  ml_kmeans(centers = 3)
## * No rows dropped by 'na.omit' call
# print our model fit
print(kmeans_model)
## K-means clustering with 3 clusters
## 
## Cluster centers:
##   Petal_Width Petal_Length
## 1    1.359259     4.292593
## 2    0.246000     1.462000
## 3    2.047826     5.626087
## 
## Within Set Sum of Squared Errors =  31.41289
# predict the associated class
predicted <- sdf_predict(kmeans_model, iris_tbl) %>%
  collect
table(predicted$Species, predicted$prediction)
##             
##               0  1  2
##   setosa      0 50  0
##   versicolor 48  0  2
##   virginica   6  0 44
pca_model <- tbl(sc, "iris") %>%
  select(-Species) %>%
  ml_pca()
## * No rows dropped by 'na.omit' call
print(pca_model)
## Explained variance:
## 
##         PC1         PC2         PC3         PC4 
## 0.924618723 0.053066483 0.017102610 0.005212184 
## 
## Rotation:
##                      PC1         PC2         PC3        PC4
## Sepal_Length -0.36138659 -0.65658877  0.58202985  0.3154872
## Sepal_Width   0.08452251 -0.73016143 -0.59791083 -0.3197231
## Petal_Length -0.85667061  0.17337266 -0.07623608 -0.4798390
## Petal_Width  -0.35828920  0.07548102 -0.54583143  0.7536574
rf_model <- iris_tbl %>%
  ml_random_forest(Species ~ Petal_Length + Petal_Width, type = "classification")
## * No rows dropped by 'na.omit' call
rf_predict <- sdf_predict(rf_model, iris_tbl) %>%
  ft_string_indexer("Species", "Species_idx") %>%
  collect

table(rf_predict$Species_idx, rf_predict$prediction)
##    
##      0  1  2
##   0 49  1  0
##   1  0 50  0
##   2  0  0 50
partitions <- tbl(sc, "iris") %>%
  sdf_partition(training = 0.75, test = 0.25, seed = 1099)

fit <- partitions$training %>%
  ml_linear_regression(Petal_Length ~ Petal_Width)
## * No rows dropped by 'na.omit' call
estimate_mse <- function(df){
  sdf_predict(fit, df) %>%
    mutate(resid = Petal_Length - prediction) %>%
    summarize(mse = mean(resid ^ 2)) %>%
    collect
}

sapply(partitions, estimate_mse)
## $training.mse
## [1] 0.2374596
## 
## $test.mse
## [1] 0.1898848
#
library(magrittr)
library(sparklyr)
library(dplyr)
library(ggplot2)
#sc <- spark_connect("local", version = "1.6.1")
mtcars_tbl <- copy_to(sc, mtcars, "mtcars", overwrite = TRUE)



# transform our data set, and then partition into 'training', 'test'
partitions <- mtcars_tbl %>%
  filter(hp >= 100) %>%
  sdf_mutate(cyl8 = ft_bucketizer(cyl, c(0,8,12))) %>%
  sdf_partition(training = 0.5, test = 0.5, seed = 888)

# fit a linear mdoel to the training dataset
fit <- partitions$training %>%ml_linear_regression(mpg ~ wt + cyl)
## * No rows dropped by 'na.omit' call
# summarize the model
summary(fit)
## Call: ml_linear_regression(., mpg ~ wt + cyl)
## 
## Deviance Residuals::
##     Min      1Q  Median      3Q     Max 
## -2.0947 -1.2747 -0.1129  1.0876  2.2185 
## 
## Coefficients:
##             Estimate Std. Error t value Pr(>|t|)    
## (Intercept) 33.79558    2.67240 12.6462 4.92e-07 ***
## wt          -1.59625    0.73729 -2.1650  0.05859 .  
## cyl         -1.58036    0.49670 -3.1817  0.01115 *  
## ---
## Signif. codes:  0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1
## 
## R-Squared: 0.8267
## Root Mean Squared Error: 1.437
#Score the data
pred <- sdf_predict(fit, partitions$test) %>%collect

# Plot the predicted versus actual mpg
ggplot(pred, aes(x = mpg, y = prediction)) +
  geom_abline(lty = "dashed", col = "red") +
  geom_point() +
  theme(plot.title = element_text(hjust = 0.5)) +
  coord_fixed(ratio = 1) +
  labs(
    x = "Actual Fuel Consumption",
    y = "Predicted Fuel Consumption",
    title = "Predicted vs. Actual Fuel Consumption"
  )