Apache Spark

Antes de falarmos sobre sparklyr vamos apresentar o Spark. O Apache Spark é um mecanismo de análise unificada para processamento de dados em larga escala.

Sparklyr

O sparklyr é um pacote que fornece uma interface entre o R e o Apache Spark. O sparklyr surgiu a pedido da comunidade do R querendo uma interface dplyr nativa para o Spark. O sparklyr também fornece interfaces para os algoritmos de aprendizado de máquina distribuída do Spark entre outros. Como destaques temos:

Instalação

Para instalação do pacote vamos usar a função install.packages()

install.packages("sparklyr")
library(sparklyr) # Carregando o pacote

Para utilizar o sparklyr e suas funções precisamos ter instalado também o spark. Para instalar o spark usaremos a função spark_install() do próprio pacote sparkly.

spark_install(version = "2.1.0")

Observação: Também é preciso ter instalado o Java no computador em sua versão mais recente. O Java pode ser baixado no seguinte link https://www.java.com

Conectando-se ao Spark

Você pode se conectar a ambas as instâncias locais do Spark, bem como aos clusters remotos do Spark.Para isso é necessário usar o Rstudio Server ou o Pro.

Existem Clusters experimentais, como o Livy (https://livy.incubator.apache.org/).Aqui vamos nos conectar a uma instância local:

sc <- spark_connect(master = "local")

A conexão Spark retornada sc fornece uma fonte de dados dplyr remota conectada ao Spark.

dplyr é um pacote R para trabalhar com dados estruturados dentro e fora de R. dplyr torna a manipulação de dados para usuários R fácil, consistente e de alto desempenho. Com o dplyr como uma interface para manipular o Spark DataFrames, você pode selecionar, filtrar e agregar dados, usar as funções da janela (por exemplo, para amostragem), executar junções em DataFrames e coletar dados do Spark em R.

Lendo dados

Você pode ler dados no Spark DataFrames usando as seguintes funções:

  • spark_read_csv: lê um arquivo CSV e fornece uma fonte de dados compatível com dplyr
  • spark_read_json: lê um arquivo JSON e fornece uma fonte de dados compatível com dplyr
  • spark_read_parquet: lê um arquivo em parquet e fornece uma fonte de dados compatível com dplyr

Independentemente do formato dos dados, o Spark oferece suporte à leitura de dados de diversas fontes de dados diferentes. Isso inclui dados armazenados em HDFS ( hdfs://protocolo), Amazon S3 ( s3n://protocolo) ou arquivos locais disponíveis para os nós de trabalho do Spark ( file://protocolo)

Cada uma dessas funções retorna uma referência a um Spark DataFrame que pode ser usado como uma tabela dplyr (tbl).

Dados

Para o exemplo usaremos como base os dados do nycflights13, pacote do R. Este pacote contém dados para todos os 336.776 voos que partiram de Nova York em 2013. Ele também inclui metadados úteis sobre companhias aéreas, aeroportos, clima e aviões. Os dados vêm do Bureau de Estatísticas de Transporte dos EUA e estão documentados em ?nycflights13.

install.packages("nycflights13") # Instalando o pacote nycflights13
library(nycflights13) # Carregando o pacote

Após conectados ao cluster (no caso “local”), copie os dados dos voos usando a função copy_to.

Advertência: Os dados de voo nycflights13 são convenientes para demonstrações dplyr porque são pequenos, mas na prática, dados grandes raramente devem ser copiados diretamente de objetos R.

library(dplyr) # Carregando o pacote
flights <- copy_to(sc, flights, "flights")

A variável flights representa no do R, o data frame dentro da conexão sc com o Spark, com ela podemos vizualizar uma parte dos dados(máximo de 1000 linhas) e manipulá-los.

Verbos DPLYR

Os verbos são comandos dplyr para manipular dados. Quando conectado a um Spark DataFrame, dplyr converte os comandos em instruções SQL do Spark. Fontes de dados remotas usam exatamente os mesmos cinco verbos que as fontes de dados locais. Aqui estão os cinco verbos com seus comandos SQL correspondentes:

  • select ~ SELECT
  • filter ~ WHERE
  • arrange ~ ORDER
  • summarise ~ aggregators: sum, min, sd, etc.
  • mutate ~ operators: +, *, log, etc.

Por exemplo, veja o seguinte código:

c1 <- filter(flights, day == 17, month == 5) %>% 
  select(c1, year, month, day, air_time) %>% 
  arrange(c2, year, month, day) %>% 
  mutate(c3, air_time_hours = air_time / 60)

Coletando para o R

carrierhours <- collect(c1)

ou diretamente

c1 <- filter(flights, day == 17, month == 5) %>%
  select(c1, year, month, day, air_time) %>%
  arrange(c2, year, month, day) %>% 
  mutate(c3, air_time_hours = air_time / 60) %>%
  collect()

collect() executa a consulta Spark e retorna os resultados para R para análise e visualização adicionais.

Exemplo em sala de aula:

#Retornará um objeto do tipo "spark"

df1 <- flights  %>% 
  filter(month == 11 , day == 13)

#Retornará um objeto do tipo "tibble"
  
df2 <- flights  %>% 
  filter(month == 11 , day == 13) %>% 
  collect()

Desconectando:

spark_disconnect(sc)

Concluindo

Como visto nos exemplos acima trabalhamos localmente, porém o objetivo do pacote sparklyr é trabalhar com um fluxo de dados (big data), ou seja, trabalhar em rede (cluster). Fazendo corretamente a conexão ao local do cluster através da função spark_connect() trabalharemos basicamente com as funções do pacote dplyr.

Como podemos ver na imagem, o sparklyr faz o papel de “ponte” para acessar os dados no data lake (dados armazenados) e permitir trabalhar com os pacotes no R.