Panoramica (sparklyr/dplyr)

dplyr è un pacchetto R per lavorare con i dati strutturati all’interno e all’esterno di R. dplyr rende la manipolazione dei dati per gli utenti R facile, coerente e performante. Con dplyr come interfaccia per manipolare Spark DataFrames, puoi:

Flights Data

Questa guida mostrerà alcuni dei verbi di manipolazione dei dati di base di dplyr usando i dati di nycflights13 pacchetto R. Questo pacchetto contiene dati per tutti i 336.776 voli in partenza da New York City nel 2013. Include anche utili metadati su compagnie aeree, aeroporti, meteo e aerei.

Connettiti al cluster e copia i dati dei voli utilizzando la funzione copy_to. Avvertenza: i dati di volo in nycflights13 sono utili per le dimostrazioni di dplyr perché sono piccoli, ma in pratica i dati di grandi dimensioni dovrebbero essere copiati raramente direttamente dagli oggetti R.

library(sparklyr)
library(dplyr)
library(nycflights13)
library(ggplot2)
sc <- spark_connect(master = "local", version = "2.0.0")
flights1 <- copy_to(sc, flights, "flights1")
airlines <- copy_to(sc, airlines, "airlines")

dplyr Verbs

Quando è connesso a Spark DataFrame, dplyr traduce i comandi in istruzioni Spark SQL. Le remote data sources utilizzano esattamente gli stessi cinque verbi delle local data sources. Ecco i cinque verbi con i corrispondenti comandi SQL:

  • select ~ SELECT
  • filter ~ WHERE
  • arrange ~ ORDER
  • group_by ~ GROUP BY
  • mutate ~ operators: +, *, log, etc.
  • summarise ~ aggregators: sum, min, sd, etc.
flights1 %>%
  select(month, day, carrier, air_time) %>%
  filter(month == 5, day == 17, carrier %in% c('UA', 'WN', 'AA', 'DL')) %>%
  arrange(carrier) %>%
  mutate(air_time_hours = air_time / 60)
flights1 %>%
  group_by(month) %>%
  summarize(count = n(), mean_dep_delay = mean(dep_delay), sd_dep_delay = sd(dep_delay)) %>%
  arrange(month)

window functions

dplyr supporta le funzioni della finestra di Spark SQL. Le funzioni della finestra vengono utilizzate insieme a mutate e filter per risolvere una vasta gamma di problemi.

# Rank the top three largest delays for each carrier
flights1 %>%
  group_by(carrier) %>%
  mutate(rank = rank(desc(dep_delay))) %>%
  filter(rank <= 3) %>%
  select(carrier, year, month, day, dep_delay, rank)

SQL Translation

dplyr sa anche come convertire le funzioni R in Spark SQL:

sql0 <- flights1 %>%
  select(carrier, dep_delay, origin) %>%
  mutate(target = ifelse(origin == "JFK", 1L, 0L)) %>% # case statement
  mutate(rank = rank(desc(dep_delay))) %>% # windows function
  left_join(airlines, by = "carrier") # join

dbplyr::sql_render(sql0)

Laziness

Quando lavora con i database, dplyr cerca di essere il più pigro possibile. - Non trasferisce mai i dati in R senza richiederlo esplicitamente. - Ritarda a fare qualsiasi lavoro fino all’ultimo momento: raccoglie tutto ciò che vuoi fare e poi lo invia al database in un solo passaggio Ad esempio, prendi il seguente codice:

sql1 <- flights1 %>%
  select(origin, dest, month, day, air_time) %>%
  filter(origin == "JFK", dest == "SFO", air_time > 0) %>%
  arrange(desc(air_time))

sql2 <- sql1 %>%
  mutate(air_time_hours = air_time / 60)

sql2

Register e cashe

sdf_register(sql2, "jfk2sfo") #register the table
tbl_cache(sc, "jfk2sfo") # cashe the table 
jfk2sfo_tbl <- tbl(sc, "jfk2sfo") # assign a reference to the table

Porta i dati da Spark a R

collect () esegue la query Spark e restituisce i risultati a R per ulteriori analisi e visualizzazione.

jfk2sfo <- collect(jfk2sfo_tbl)
jfk2sfo

Machine learning

È possibile orchestrare algoritmi di machine learning in un cluster Spark tramite le funzioni machine learning all’interno di sparklyr. Queste funzioni si connettono a un set di API di alto livello basate su DataFrames che consentono di creare e ottimizzare i flussi di lavoro di machine learning.

  • Ecco un esempio in cui usiamo ml_linear_regression per adattare un modello di regressione lineare. Useremo il dataset mtcars e vediamo se possiamo prevedere il consumo di carburante di una vettura (mpg) in base al suo peso (wt) e il numero di cilindri che il motore contiene (cil). In ogni caso, assumeremo che la relazione tra mpg e ciascuna delle nostre funzionalità sia lineare.
# copiamo mtcars nel spark
mtcars_tbl <- copy_to(sc, mtcars)

# trasforma il nostro dataset e quindi esegui la partizione in 'training', 'test'
partitions <- mtcars_tbl %>%
  filter(hp >= 100) %>%
  mutate(cyl8 = cyl == 8) %>%
  sdf_partition(training = 0.5, test = 0.5, seed = 1099)

# adatta un modello lineare al training dataset
fit <- partitions$training %>%
  ml_linear_regression(response = "mpg", features = c("wt", "cyl"))
fit

we can use summary() to learn a bit more about the quality of our fit, and the statistical significance of each of our predictors.

summary(fit)

Usare pachetti non-spparkly


In questo caso usando il pacchetto dplyr Le query fatte sopra, quando si lavora con il dataset di nycflights13 danno lo stesso risultato di quando si lavora senza il pacchetto sparklyr aggiunto ad esso. Ma ci sono alcune differenze, per esempio guardando la query sotto

delay <- flights_tbl %>%   
group_by(tailnum) %>%  
summarise(count = n(), dist = mean(distance), delay = 
mean(arr_delay)) %>% 
 filter(count > 20, dist < 2000, !is.na(delay)) %>%  collect
nrow(delay)  
[1] 2961
summary(delay)
   tailnum              count            dist            delay         
 Length:2961        Min.   : 21.0   Min.   : 173.1   Min.   :-17.0909  
 Class :character   1st Qu.: 41.0   1st Qu.: 635.1   1st Qu.:  0.6154  
 Mode  :character   Median : 78.0   Median : 980.9   Median :  5.2581  
                    Mean   :103.6   Mean   : 986.0   Mean   :  6.5679  
                    3rd Qu.:126.0   3rd Qu.:1287.0   3rd Qu.: 11.3038  
                    Max.   :575.0   Max.   :1999.9   Max.   : 59.1219  
delay_1 <- flights_tbl_1 %>%   group_by(tailnum) %>%  
summarise(count = n(), dist = mean(distance), 
delay = mean(arr_delay)) %>%  
filter(count > 20, dist < 2000, !is.na(delay))
nrow(delay_1) 
[1] 1319
summary(delay_1)
   tailnum              count             dist            delay          
 Length:1319        Min.   : 21.00   Min.   : 173.1   Min.   :-17.09091  
 Class :character   1st Qu.: 34.00   1st Qu.: 818.4   1st Qu.: -0.00472  
 Mode  :character   Median : 60.00   Median :1084.9   Median :  3.92593  
                    Mean   : 70.56   Mean   :1075.6   Mean   :  5.27312  
                    3rd Qu.: 99.00   3rd Qu.:1405.5   3rd Qu.:  9.06380  
                    Max.   :311.00   Max.   :1999.9   Max.   : 59.12195  

Disegna i grafici per entrambi i casi:

ggplot(delay, aes(dist, delay)) + 
 geom_point(aes(size = count), alpha = 1/2) +  
geom_smooth() +  scale_size_area(max_size = 2) +
ggtitle("sparklyr/dplyr")

ggplot(delay_1, aes(dist, delay)) +  
geom_point(aes(size = count), alpha = 1/2) +  
geom_smooth() +  scale_size_area(max_size = 2) +
ggtitle("dplyr")

Le differenze

  • Spark ha un impostazione na.rm = TRUE by default per tutti operazioni arimetiche; i valori NA sono già stati rimossi Il risultato sara uguale se mettiamo na.rm = TRUE sul codice equivalente in R

  • spaklyr è un’implementazione R dell’interfaccia di Spark. Spark in sé è una soluzione che ti permette di lavorare con i big data (pensa a terabyte o persino a petabyte) che è semplicemente impossibile su una singola macchina. Quindi useresti sparklyr quando e se non vuoi lavorare direttamente con Spark (attraverso Scala, ad esempio), ma vuoi rimanere nell’ecosistema R.

  • Inoltre, dplyr e sparklyr non sono opposti l’uno all’altro. Puoi (e probabilmente lo faranno) usarli insieme. Per quanto riguarda Spark, l’aumento della velocità di calcolo - in realtà non è il caso in senso stretto. Ad esempio, se vuoi costruire un modello e i dati si adattano alla memoria della tua macchina (per esempio, sono solo 100k righe), è più veloce farlo localmente e non preoccuparti di Spark

