Apache Spark è un framework per l’elaborazione di big data che possiede moduli integrati che consentono tra le altre cose di produrre streaming analytics, Machine Learning (ML) ed elaborazioni grafiche. I data scientist che utilizzano R possono trarre beneficio dall’apprendimento di Spark perché esso consente di eseguire analisi dei dati esplorative (EDA) e predittive su data set troppo grandi per essere gestiti in memoria.

Spark è scritto in Scala e gira su una Java Virtual Machine (JVM). Per utilizzarlo è necessario aver installato un Java Development Kit

In questo draft mostrerò come implementare in Spark dei semplici modelli utilizzando la libreria sparklyr. Utilizzerò Spark in modalita locale.
La modalità locale è un modo utile per sperimentare analisi e applicazioni che eventualmente possono essere implementate su un cluster Spark multi-node.


1 Alcune delle funzionalità di sparklyr

2 Machine Learning con sparklyr

Il workflow per implementare semplici modelli di ML utilizzando sparklyr è generalmente questo:

Carichiamo i pacchetti necessari e ci connettiamo a Spark in locale

library(rmarkdown)
library(tidyverse)
library(sparklyr)
library(ggplot2)

sc <- spark_connect(master = "local")

2.1 Regressione Lineare

## carichiamo i dati
data(Boston, package = "MASS") 

## copiamo il data set nel cluster Spark
boston <- copy_to(sc, Boston, "boston", overwrite = TRUE)

## splittiamo il data set in training e test 
partitions <- boston %>% 
      sdf_partition(training = 0.7, test = 0.3, seed = 1099) 

## implementiamo il modello sul training set
fit <- partitions$training %>% 
      ml_linear_regression(response = "medv", features = c("crim", "zn", "indus", "chas", "nox",
                                                           "rm", "age", "dis", "rad", "tax", "ptratio", "black", "lstat")) 

## facciamo previsioni sul test set
prediction <- ml_predict(fit, dataset = partitions$test) 

## calcoliamo il Mean Squared Error
MSE <- prediction %>% 
      select(medv, prediction) %>% 
      summarise(MSE = mean((medv-prediction)^2)) %>% 
      collect %>% 
      as.numeric() 

## visualizziamo i risultati
prediction %>% 
      collect %>% 
      ggplot(aes(x = medv, y = prediction)) + 
      geom_point(col = tidyquant::palette_light()[[1]], alpha = 0.5, size = 2.5) +
      geom_smooth(method = "lm", se = FALSE, col = "red") +
      labs(y = "Valori predetti" , x = "Valori osservati", title = "Regressione lineare con sparklyr") + 
      annotate(geom = "text", x = 40, y = 10, size = 5, label = paste0("MSE = ", round(MSE, 1)))

2.2 Kmeans Clustering

## copiamo il data set nel cluster
iris_tbl <- copy_to(dest = sc, df = iris, name = "iris", overwrite = TRUE)

## implementiamo il modello
kmeans_model <- ml_kmeans(x = iris_tbl, centers = 3, features = c("Petal_Width", "Petal_Length")) 

## in questo caso non ho splittato il data set, osserviamo come si comporta l'algoritmo sul training set
predicted <- ml_predict(kmeans_model, iris_tbl) %>% 
      collect

## visualizziamo i risultati
ml_predict(kmeans_model) %>% 
      collect() %>% 
      ggplot(aes(Petal_Length, Petal_Width)) +
      geom_point(aes(Petal_Width, Petal_Length, col = factor(prediction + 1)),
                 size = 2, alpha = 0.5) + 
      geom_point(data = kmeans_model$centers, aes(Petal_Width, Petal_Length),
                 col = scales::muted(c("red", "green", "blue")),
                 pch = 'x', size = 12) +
      scale_color_discrete(name = "Cluster Previsto",
                           labels = paste("Cluster", 1:3)) +
      labs(
            x = "Petal Length",
            y = "Petal Width",
            title = "K-Means Clustering con sparklyr"
      )

2.3 Regressione Logistica

## carichiamo i dati
beaver <- beaver2
beaver$activ <- factor(beaver$activ, 
                       labels = c("Non-Active", "Active"))

## copiamo il data set nel cluster
beaver_tbl <- copy_to(sc, beaver, "beaver")

## splittiamo il data set
partitions <- beaver_tbl %>% 
      sdf_partition(training = 0.7, test = 0.3, seed = 1099) 

## implementiamo il modello
glm_model <- partitions$training %>% 
      mutate(binary_response = as.numeric(activ == "Active")) %>% 
      ml_logistic_regression(binary_response ~ temp)

## facciamo previsioni sul test set
prediction <- ml_predict(glm_model, dataset = partitions$test)

predicted <- prediction %>% 
      collect %>% 
      mutate(predicted = as.factor(round(as.numeric(predicted_label), 0)))

actual <- partitions$test %>% 
      collect %>% 
      mutate(binary_response = as.factor(as.numeric(activ == "Active")))

## Visualiziamo la matrice di confusione
cm <- caret::confusionMatrix(actual$binary_response,predicted$predicted)
table(`Valori predetti` = predicted$predicted, `Valori osservati` = actual$binary_response)
               Valori osservati
Valori predetti  0  1
              0  8  1
              1  0 20
paste("Accuratezza del modello: ", round(cm$overall[1], 3))
[1] "Accuratezza del modello:  0.966"

Al termine del lavoro chiudiamo la connessione con Spark

spark_disconnect(sc)

Fonte: Rstudio - sparklyr

 

A work by Fabio Paderi

paderifabio@gmail.com