Apache Spark è un framework per l’elaborazione di big data che possiede moduli integrati che consentono tra le altre cose di produrre streaming analytics, Machine Learning (ML) ed elaborazioni grafiche. I data scientist che utilizzano R possono trarre beneficio dall’apprendimento di Spark perché esso consente di eseguire analisi dei dati esplorative (EDA) e predittive su data set troppo grandi per essere gestiti in memoria.
Spark è scritto in Scala e gira su una Java Virtual Machine (JVM). Per utilizzarlo è necessario aver installato un Java Development Kit
In questo draft mostrerò come implementare in Spark dei semplici modelli utilizzando la libreria
sparklyr. Utilizzerò Spark in modalita locale.
La modalità locale è un modo utile per sperimentare analisi e applicazioni che eventualmente possono essere implementate su un cluster Spark multi-node.
dplyr per manipolare i datiIl workflow per implementare semplici modelli di ML utilizzando sparklyr è generalmente questo:
dplyrsdf_* e ft_* per generare nuove variabili o per partizionare il data setml_*Carichiamo i pacchetti necessari e ci connettiamo a Spark in locale
library(rmarkdown)
library(tidyverse)
library(sparklyr)
library(ggplot2)
sc <- spark_connect(master = "local")## carichiamo i dati
data(Boston, package = "MASS")
## copiamo il data set nel cluster Spark
boston <- copy_to(sc, Boston, "boston", overwrite = TRUE)
## splittiamo il data set in training e test
partitions <- boston %>%
sdf_partition(training = 0.7, test = 0.3, seed = 1099)
## implementiamo il modello sul training set
fit <- partitions$training %>%
ml_linear_regression(response = "medv", features = c("crim", "zn", "indus", "chas", "nox",
"rm", "age", "dis", "rad", "tax", "ptratio", "black", "lstat"))
## facciamo previsioni sul test set
prediction <- ml_predict(fit, dataset = partitions$test)
## calcoliamo il Mean Squared Error
MSE <- prediction %>%
select(medv, prediction) %>%
summarise(MSE = mean((medv-prediction)^2)) %>%
collect %>%
as.numeric()
## visualizziamo i risultati
prediction %>%
collect %>%
ggplot(aes(x = medv, y = prediction)) +
geom_point(col = tidyquant::palette_light()[[1]], alpha = 0.5, size = 2.5) +
geom_smooth(method = "lm", se = FALSE, col = "red") +
labs(y = "Valori predetti" , x = "Valori osservati", title = "Regressione lineare con sparklyr") +
annotate(geom = "text", x = 40, y = 10, size = 5, label = paste0("MSE = ", round(MSE, 1)))## copiamo il data set nel cluster
iris_tbl <- copy_to(dest = sc, df = iris, name = "iris", overwrite = TRUE)
## implementiamo il modello
kmeans_model <- ml_kmeans(x = iris_tbl, centers = 3, features = c("Petal_Width", "Petal_Length"))
## in questo caso non ho splittato il data set, osserviamo come si comporta l'algoritmo sul training set
predicted <- ml_predict(kmeans_model, iris_tbl) %>%
collect
## visualizziamo i risultati
ml_predict(kmeans_model) %>%
collect() %>%
ggplot(aes(Petal_Length, Petal_Width)) +
geom_point(aes(Petal_Width, Petal_Length, col = factor(prediction + 1)),
size = 2, alpha = 0.5) +
geom_point(data = kmeans_model$centers, aes(Petal_Width, Petal_Length),
col = scales::muted(c("red", "green", "blue")),
pch = 'x', size = 12) +
scale_color_discrete(name = "Cluster Previsto",
labels = paste("Cluster", 1:3)) +
labs(
x = "Petal Length",
y = "Petal Width",
title = "K-Means Clustering con sparklyr"
)## carichiamo i dati
beaver <- beaver2
beaver$activ <- factor(beaver$activ,
labels = c("Non-Active", "Active"))
## copiamo il data set nel cluster
beaver_tbl <- copy_to(sc, beaver, "beaver")
## splittiamo il data set
partitions <- beaver_tbl %>%
sdf_partition(training = 0.7, test = 0.3, seed = 1099)
## implementiamo il modello
glm_model <- partitions$training %>%
mutate(binary_response = as.numeric(activ == "Active")) %>%
ml_logistic_regression(binary_response ~ temp)
## facciamo previsioni sul test set
prediction <- ml_predict(glm_model, dataset = partitions$test)
predicted <- prediction %>%
collect %>%
mutate(predicted = as.factor(round(as.numeric(predicted_label), 0)))
actual <- partitions$test %>%
collect %>%
mutate(binary_response = as.factor(as.numeric(activ == "Active")))
## Visualiziamo la matrice di confusione
cm <- caret::confusionMatrix(actual$binary_response,predicted$predicted)
table(`Valori predetti` = predicted$predicted, `Valori osservati` = actual$binary_response) Valori osservati
Valori predetti 0 1
0 8 1
1 0 20
paste("Accuratezza del modello: ", round(cm$overall[1], 3))[1] "Accuratezza del modello: 0.966"
Al termine del lavoro chiudiamo la connessione con Spark
spark_disconnect(sc)Fonte: Rstudio - sparklyr
A work by Fabio Paderi