ls()
## character(0)
rm(list=ls())
gc()
## used (Mb) gc trigger (Mb) max used (Mb)
## Ncells 471538 25.2 940480 50.3 713531 38.2
## Vcells 845104 6.5 1650153 12.6 1023636 7.9
getwd()
## [1] "C:/Users/KOGENTIX/Documents/R"
setwd("C:/Users/KOGENTIX/Documents")
#install.packages("sparklyr")
library(sparklyr)
sc <- spark_connect(master = "local")
#install.packages(c("nycflights13", "Lahman"))
library(dplyr)
##
## Attaching package: 'dplyr'
## The following objects are masked from 'package:stats':
##
## filter, lag
## The following objects are masked from 'package:base':
##
## intersect, setdiff, setequal, union
iris_tbl <- copy_to(sc, iris,overwrite = TRUE)
flights_tbl <- copy_to(sc, nycflights13::flights, "flights",overwrite = TRUE)
batting_tbl <- copy_to(sc, Lahman::Batting, "batting",overwrite = TRUE)
src_tbls(sc)
## [1] "batting" "flights" "iris"
# filter by departure delay and print the first few records
flights_tbl %>% filter(dep_delay == 2)
## # Source: lazy query [?? x 19]
## # Database: spark_connection
## year month day dep_time sched_dep_time dep_delay arr_time
## <int> <int> <int> <int> <int> <dbl> <int>
## 1 2013 1 1 517 515 2 830
## 2 2013 1 1 542 540 2 923
## 3 2013 1 1 702 700 2 1058
## 4 2013 1 1 715 713 2 911
## 5 2013 1 1 752 750 2 1025
## 6 2013 1 1 917 915 2 1206
## 7 2013 1 1 932 930 2 1219
## 8 2013 1 1 1028 1026 2 1350
## 9 2013 1 1 1042 1040 2 1325
## 10 2013 1 1 1231 1229 2 1523
## # ... with more rows, and 12 more variables: sched_arr_time <int>,
## # arr_delay <dbl>, carrier <chr>, flight <int>, tailnum <chr>,
## # origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>,
## # minute <dbl>, time_hour <dbl>
library(DBI)
iris_preview <- dbGetQuery(sc, "SELECT * FROM iris LIMIT 10")
iris_preview
## Sepal_Length Sepal_Width Petal_Length Petal_Width Species
## 1 5.1 3.5 1.4 0.2 setosa
## 2 4.9 3.0 1.4 0.2 setosa
## 3 4.7 3.2 1.3 0.2 setosa
## 4 4.6 3.1 1.5 0.2 setosa
## 5 5.0 3.6 1.4 0.2 setosa
## 6 5.4 3.9 1.7 0.4 setosa
## 7 4.6 3.4 1.4 0.3 setosa
## 8 5.0 3.4 1.5 0.2 setosa
## 9 4.4 2.9 1.4 0.2 setosa
## 10 4.9 3.1 1.5 0.1 setosa
# copy mtcars into spark
mtcars_tbl <- copy_to(sc, mtcars,overwrite = TRUE)
# transform our data set, and then partition into 'training', 'test'
partitions <- mtcars_tbl %>%
filter(hp >= 100) %>%
mutate(cyl8 = cyl == 8) %>%
sdf_partition(training = 0.5, test = 0.5, seed = 1099)
# fit a linear model to the training dataset
fit <- partitions$training %>%
ml_linear_regression(response = "mpg", features = c("wt", "cyl"))
## * No rows dropped by 'na.omit' call
fit
## Call: ml_linear_regression(., response = "mpg", features = c("wt", "cyl"))
##
## Coefficients:
## (Intercept) wt cyl
## 33.499452 -2.818463 -0.923187
summary(fit)
## Call: ml_linear_regression(., response = "mpg", features = c("wt", "cyl"))
##
## Deviance Residuals::
## Min 1Q Median 3Q Max
## -1.752 -1.134 -0.499 1.296 2.282
##
## Coefficients:
## Estimate Std. Error t value Pr(>|t|)
## (Intercept) 33.49945 3.62256 9.2475 0.0002485 ***
## wt -2.81846 0.96619 -2.9171 0.0331257 *
## cyl -0.92319 0.54639 -1.6896 0.1518998
## ---
## Signif. codes: 0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1
##
## R-Squared: 0.8274
## Root Mean Squared Error: 1.422
spark_apply(iris_tbl, function(data) {
data[1:4] + rgamma(1,2)
})
## # Source: table<sparklyr_tmp_263822501050> [?? x 4]
## # Database: spark_connection
## Sepal_Length Sepal_Width Petal_Length Petal_Width
## <dbl> <dbl> <dbl> <dbl>
## 1 10.86204 9.262043 7.162043 5.962043
## 2 10.66204 8.762043 7.162043 5.962043
## 3 10.46204 8.962043 7.062043 5.962043
## 4 10.36204 8.862043 7.262043 5.962043
## 5 10.76204 9.362043 7.162043 5.962043
## 6 11.16204 9.662043 7.462043 6.162043
## 7 10.36204 9.162043 7.162043 6.062043
## 8 10.76204 9.162043 7.262043 5.962043
## 9 10.16204 8.662043 7.162043 5.962043
## 10 10.66204 8.862043 7.262043 5.862043
## # ... with more rows
kmeans_model <- iris_tbl %>%
select(Petal_Width, Petal_Length) %>%
ml_kmeans(centers = 3)
## * No rows dropped by 'na.omit' call
# print our model fit
print(kmeans_model)
## K-means clustering with 3 clusters
##
## Cluster centers:
## Petal_Width Petal_Length
## 1 1.359259 4.292593
## 2 0.246000 1.462000
## 3 2.047826 5.626087
##
## Within Set Sum of Squared Errors = 31.41289
# predict the associated class
predicted <- sdf_predict(kmeans_model, iris_tbl) %>%
collect
table(predicted$Species, predicted$prediction)
##
## 0 1 2
## setosa 0 50 0
## versicolor 48 0 2
## virginica 6 0 44
pca_model <- tbl(sc, "iris") %>%
select(-Species) %>%
ml_pca()
## * No rows dropped by 'na.omit' call
print(pca_model)
## Explained variance:
##
## PC1 PC2 PC3 PC4
## 0.924618723 0.053066483 0.017102610 0.005212184
##
## Rotation:
## PC1 PC2 PC3 PC4
## Sepal_Length -0.36138659 -0.65658877 0.58202985 0.3154872
## Sepal_Width 0.08452251 -0.73016143 -0.59791083 -0.3197231
## Petal_Length -0.85667061 0.17337266 -0.07623608 -0.4798390
## Petal_Width -0.35828920 0.07548102 -0.54583143 0.7536574
rf_model <- iris_tbl %>%
ml_random_forest(Species ~ Petal_Length + Petal_Width, type = "classification")
## * No rows dropped by 'na.omit' call
rf_predict <- sdf_predict(rf_model, iris_tbl) %>%
ft_string_indexer("Species", "Species_idx") %>%
collect
table(rf_predict$Species_idx, rf_predict$prediction)
##
## 0 1 2
## 0 49 1 0
## 1 0 50 0
## 2 0 0 50
partitions <- tbl(sc, "iris") %>%
sdf_partition(training = 0.75, test = 0.25, seed = 1099)
fit <- partitions$training %>%
ml_linear_regression(Petal_Length ~ Petal_Width)
## * No rows dropped by 'na.omit' call
estimate_mse <- function(df){
sdf_predict(fit, df) %>%
mutate(resid = Petal_Length - prediction) %>%
summarize(mse = mean(resid ^ 2)) %>%
collect
}
sapply(partitions, estimate_mse)
## $training.mse
## [1] 0.2374596
##
## $test.mse
## [1] 0.1898848
#
library(magrittr)
library(sparklyr)
library(dplyr)
library(ggplot2)
#sc <- spark_connect("local", version = "1.6.1")
mtcars_tbl <- copy_to(sc, mtcars, "mtcars", overwrite = TRUE)
# transform our data set, and then partition into 'training', 'test'
partitions <- mtcars_tbl %>%
filter(hp >= 100) %>%
sdf_mutate(cyl8 = ft_bucketizer(cyl, c(0,8,12))) %>%
sdf_partition(training = 0.5, test = 0.5, seed = 888)
# fit a linear mdoel to the training dataset
fit <- partitions$training %>%ml_linear_regression(mpg ~ wt + cyl)
## * No rows dropped by 'na.omit' call
# summarize the model
summary(fit)
## Call: ml_linear_regression(., mpg ~ wt + cyl)
##
## Deviance Residuals::
## Min 1Q Median 3Q Max
## -2.0947 -1.2747 -0.1129 1.0876 2.2185
##
## Coefficients:
## Estimate Std. Error t value Pr(>|t|)
## (Intercept) 33.79558 2.67240 12.6462 4.92e-07 ***
## wt -1.59625 0.73729 -2.1650 0.05859 .
## cyl -1.58036 0.49670 -3.1817 0.01115 *
## ---
## Signif. codes: 0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1
##
## R-Squared: 0.8267
## Root Mean Squared Error: 1.437
#Score the data
pred <- sdf_predict(fit, partitions$test) %>%collect
# Plot the predicted versus actual mpg
ggplot(pred, aes(x = mpg, y = prediction)) +
geom_abline(lty = "dashed", col = "red") +
geom_point() +
theme(plot.title = element_text(hjust = 0.5)) +
coord_fixed(ratio = 1) +
labs(
x = "Actual Fuel Consumption",
y = "Predicted Fuel Consumption",
title = "Predicted vs. Actual Fuel Consumption"
)
