Panoramica (sparklyr/dplyr)
dplyr è un pacchetto R per lavorare con i dati strutturati all’interno e all’esterno di R. dplyr rende la manipolazione dei dati per gli utenti R facile, coerente e performante. Con dplyr come interfaccia per manipolare Spark DataFrames, puoi:
- Selezionare, filtrare e aggregare i dati
- Usare le funzioni window functions (ad es. sampling)
- Eseguire join su DataFrames
- Raccogliere dati da Spark in R
Flights Data
Questa guida mostrerà alcuni dei verbi di manipolazione dei dati di base di dplyr usando i dati di nycflights13 pacchetto R. Questo pacchetto contiene dati per tutti i 336.776 voli in partenza da New York City nel 2013. Include anche utili metadati su compagnie aeree, aeroporti, meteo e aerei.
Connettiti al cluster e copia i dati dei voli utilizzando la funzione copy_to. Avvertenza: i dati di volo in nycflights13 sono utili per le dimostrazioni di dplyr perché sono piccoli, ma in pratica i dati di grandi dimensioni dovrebbero essere copiati raramente direttamente dagli oggetti R.
library(sparklyr)
library(dplyr)
library(nycflights13)
library(ggplot2)
sc <- spark_connect(master = "local", version = "2.0.0")
flights1 <- copy_to(sc, flights, "flights1")
airlines <- copy_to(sc, airlines, "airlines")
dplyr Verbs
Quando è connesso a Spark DataFrame, dplyr traduce i comandi in istruzioni Spark SQL. Le remote data sources utilizzano esattamente gli stessi cinque verbi delle local data sources. Ecco i cinque verbi con i corrispondenti comandi SQL:
- select ~ SELECT
- filter ~ WHERE
- arrange ~ ORDER
- group_by ~ GROUP BY
- mutate ~ operators: +, *, log, etc.
- summarise ~ aggregators: sum, min, sd, etc.
flights1 %>%
select(month, day, carrier, air_time) %>%
filter(month == 5, day == 17, carrier %in% c('UA', 'WN', 'AA', 'DL')) %>%
arrange(carrier) %>%
mutate(air_time_hours = air_time / 60)
flights1 %>%
group_by(month) %>%
summarize(count = n(), mean_dep_delay = mean(dep_delay), sd_dep_delay = sd(dep_delay)) %>%
arrange(month)
window functions
dplyr supporta le funzioni della finestra di Spark SQL. Le funzioni della finestra vengono utilizzate insieme a mutate e filter per risolvere una vasta gamma di problemi.
# Rank the top three largest delays for each carrier
flights1 %>%
group_by(carrier) %>%
mutate(rank = rank(desc(dep_delay))) %>%
filter(rank <= 3) %>%
select(carrier, year, month, day, dep_delay, rank)
SQL Translation
dplyr sa anche come convertire le funzioni R in Spark SQL:
sql0 <- flights1 %>%
select(carrier, dep_delay, origin) %>%
mutate(target = ifelse(origin == "JFK", 1L, 0L)) %>% # case statement
mutate(rank = rank(desc(dep_delay))) %>% # windows function
left_join(airlines, by = "carrier") # join
dbplyr::sql_render(sql0)
Laziness
Quando lavora con i database, dplyr cerca di essere il più pigro possibile. - Non trasferisce mai i dati in R senza richiederlo esplicitamente. - Ritarda a fare qualsiasi lavoro fino all’ultimo momento: raccoglie tutto ciò che vuoi fare e poi lo invia al database in un solo passaggio Ad esempio, prendi il seguente codice:
sql1 <- flights1 %>%
select(origin, dest, month, day, air_time) %>%
filter(origin == "JFK", dest == "SFO", air_time > 0) %>%
arrange(desc(air_time))
sql2 <- sql1 %>%
mutate(air_time_hours = air_time / 60)
sql2
Register e cashe
sdf_register(sql2, "jfk2sfo") #register the table
tbl_cache(sc, "jfk2sfo") # cashe the table
jfk2sfo_tbl <- tbl(sc, "jfk2sfo") # assign a reference to the table
Porta i dati da Spark a R
collect () esegue la query Spark e restituisce i risultati a R per ulteriori analisi e visualizzazione.
jfk2sfo <- collect(jfk2sfo_tbl)
jfk2sfo
Machine learning
È possibile orchestrare algoritmi di machine learning in un cluster Spark tramite le funzioni machine learning all’interno di sparklyr. Queste funzioni si connettono a un set di API di alto livello basate su DataFrames che consentono di creare e ottimizzare i flussi di lavoro di machine learning.
- Ecco un esempio in cui usiamo ml_linear_regression per adattare un modello di regressione lineare. Useremo il dataset mtcars e vediamo se possiamo prevedere il consumo di carburante di una vettura (mpg) in base al suo peso (wt) e il numero di cilindri che il motore contiene (cil). In ogni caso, assumeremo che la relazione tra mpg e ciascuna delle nostre funzionalità sia lineare.
# copiamo mtcars nel spark
mtcars_tbl <- copy_to(sc, mtcars)
# trasforma il nostro dataset e quindi esegui la partizione in 'training', 'test'
partitions <- mtcars_tbl %>%
filter(hp >= 100) %>%
mutate(cyl8 = cyl == 8) %>%
sdf_partition(training = 0.5, test = 0.5, seed = 1099)
# adatta un modello lineare al training dataset
fit <- partitions$training %>%
ml_linear_regression(response = "mpg", features = c("wt", "cyl"))
fit
we can use summary() to learn a bit more about the quality of our fit, and the statistical significance of each of our predictors.
summary(fit)
Usare pachetti non-spparkly
In questo caso usando il pacchetto dplyr Le query fatte sopra, quando si lavora con il dataset di nycflights13 danno lo stesso risultato di quando si lavora senza il pacchetto sparklyr aggiunto ad esso. Ma ci sono alcune differenze, per esempio guardando la query sotto
- Quando si lavora con sparklyr / dplyr:
delay <- flights_tbl %>%
group_by(tailnum) %>%
summarise(count = n(), dist = mean(distance), delay =
mean(arr_delay)) %>%
filter(count > 20, dist < 2000, !is.na(delay)) %>% collect
nrow(delay)
[1] 2961
summary(delay)
tailnum count dist delay
Length:2961 Min. : 21.0 Min. : 173.1 Min. :-17.0909
Class :character 1st Qu.: 41.0 1st Qu.: 635.1 1st Qu.: 0.6154
Mode :character Median : 78.0 Median : 980.9 Median : 5.2581
Mean :103.6 Mean : 986.0 Mean : 6.5679
3rd Qu.:126.0 3rd Qu.:1287.0 3rd Qu.: 11.3038
Max. :575.0 Max. :1999.9 Max. : 59.1219
- Quando si lavora con dplyr:
delay_1 <- flights_tbl_1 %>% group_by(tailnum) %>%
summarise(count = n(), dist = mean(distance),
delay = mean(arr_delay)) %>%
filter(count > 20, dist < 2000, !is.na(delay))
nrow(delay_1)
[1] 1319
summary(delay_1)
tailnum count dist delay
Length:1319 Min. : 21.00 Min. : 173.1 Min. :-17.09091
Class :character 1st Qu.: 34.00 1st Qu.: 818.4 1st Qu.: -0.00472
Mode :character Median : 60.00 Median :1084.9 Median : 3.92593
Mean : 70.56 Mean :1075.6 Mean : 5.27312
3rd Qu.: 99.00 3rd Qu.:1405.5 3rd Qu.: 9.06380
Max. :311.00 Max. :1999.9 Max. : 59.12195
Disegna i grafici per entrambi i casi:
ggplot(delay, aes(dist, delay)) +
geom_point(aes(size = count), alpha = 1/2) +
geom_smooth() + scale_size_area(max_size = 2) +
ggtitle("sparklyr/dplyr")

ggplot(delay_1, aes(dist, delay)) +
geom_point(aes(size = count), alpha = 1/2) +
geom_smooth() + scale_size_area(max_size = 2) +
ggtitle("dplyr")

Le differenze
Spark ha un impostazione na.rm = TRUE by default per tutti operazioni arimetiche; i valori NA sono già stati rimossi Il risultato sara uguale se mettiamo na.rm = TRUE sul codice equivalente in R
spaklyr è un’implementazione R dell’interfaccia di Spark. Spark in sé è una soluzione che ti permette di lavorare con i big data (pensa a terabyte o persino a petabyte) che è semplicemente impossibile su una singola macchina. Quindi useresti sparklyr quando e se non vuoi lavorare direttamente con Spark (attraverso Scala, ad esempio), ma vuoi rimanere nell’ecosistema R.
Inoltre, dplyr e sparklyr non sono opposti l’uno all’altro. Puoi (e probabilmente lo faranno) usarli insieme. Per quanto riguarda Spark, l’aumento della velocità di calcolo - in realtà non è il caso in senso stretto. Ad esempio, se vuoi costruire un modello e i dati si adattano alla memoria della tua macchina (per esempio, sono solo 100k righe), è più veloce farlo localmente e non preoccuparti di Spark
